diff --git a/Cargo.lock b/Cargo.lock index 86057766d..0448947ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5798,6 +5798,7 @@ dependencies = [ "prost", "serde", "tokio", + "tokio-stream", "tonic 0.11.0", "tonic-build 0.11.0", "tower", diff --git a/zebra-grpc/Cargo.toml b/zebra-grpc/Cargo.toml index 7de5a505d..8249532bb 100644 --- a/zebra-grpc/Cargo.toml +++ b/zebra-grpc/Cargo.toml @@ -21,12 +21,14 @@ tonic = "0.11.0" prost = "0.12.3" serde = { version = "1.0.196", features = ["serde_derive"] } tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"] } +tokio-stream = "0.1.14" tower = { version = "0.4.13", features = ["util", "buffer"] } color-eyre = "0.6.2" zcash_primitives = { version = "0.13.0-rc.1" } zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.34", features = ["shielded-scan"] } +zebra-chain = { path = "../zebra-chain" , version = "1.0.0-beta.34" } [build-dependencies] tonic-build = "0.11.0" diff --git a/zebra-grpc/proto/scanner.proto b/zebra-grpc/proto/scanner.proto index cc6e0b73b..ed68295a4 100644 --- a/zebra-grpc/proto/scanner.proto +++ b/zebra-grpc/proto/scanner.proto @@ -22,6 +22,9 @@ service Scanner { // Submits scanning keys to the scanner. rpc RegisterKeys(RegisterKeysRequest) returns (RegisterKeysResponse); + + // Register keys and listen to the results + rpc Scan (ScanRequest) returns (stream ScanResponse); } // A response to a GetInfo call. @@ -69,13 +72,19 @@ message RegisterKeysResponse { // A result for a single key. message Results { // A height, transaction id map - map transactions = 1; + map by_height = 1; } // A vector of transaction hashes -message TransactionHash { - // A transaction id hash - repeated string hash = 1; +message Transactions { + // Transactions + repeated Transaction transactions = 1; +} + +// Transaction data +message Transaction { + // The transaction hash/id + string hash = 1; } // A scanning key with an optional birth height @@ -85,3 +94,15 @@ message KeyWithHeight { // Birth height of the key optional uint32 height = 2; } + +// A request for registering keys and getting their transactions +message ScanRequest { + // A set of viewing keys + repeated KeyWithHeight keys = 2; +} + +// Response to Scan calls +message ScanResponse { + // Results for each key. + map results = 1; +} \ No newline at end of file diff --git a/zebra-grpc/src/server.rs b/zebra-grpc/src/server.rs index 0e358e43e..3e64a2df9 100644 --- a/zebra-grpc/src/server.rs +++ b/zebra-grpc/src/server.rs @@ -1,23 +1,31 @@ //! The gRPC server implementation -use std::{collections::BTreeMap, net::SocketAddr}; +use std::{collections::BTreeMap, net::SocketAddr, pin::Pin}; use futures_util::future::TryFutureExt; +use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::{transport::Server, Request, Response, Status}; use tower::ServiceExt; +use zebra_chain::{block::Height, transaction}; use zebra_node_services::scan_service::{ - request::Request as ScanServiceRequest, response::Response as ScanServiceResponse, + request::Request as ScanServiceRequest, + response::{Response as ScanServiceResponse, ScanResult}, }; use crate::scanner::{ scanner_server::{Scanner, ScannerServer}, ClearResultsRequest, DeleteKeysRequest, Empty, GetResultsRequest, GetResultsResponse, - InfoReply, RegisterKeysRequest, RegisterKeysResponse, Results, TransactionHash, + InfoReply, KeyWithHeight, RegisterKeysRequest, RegisterKeysResponse, Results, ScanRequest, + ScanResponse, Transaction, Transactions, }; type BoxError = Box; +/// The maximum number of messages that can be queued to be streamed to a client +/// from the `scan` method. +const SCAN_RESPONDER_BUFFER_SIZE: usize = 10_000; + #[derive(Debug)] /// The server implementation pub struct ScannerRPC @@ -42,7 +50,104 @@ where + 'static, >::Future: Send, { - async fn get_info(&self, _request: Request) -> Result, Status> { + type ScanStream = Pin> + Send>>; + + async fn scan( + &self, + request: tonic::Request, + ) -> Result, Status> { + let keys = request.into_inner().keys; + + if keys.is_empty() { + let msg = "must provide at least 1 key in scan request"; + return Err(Status::invalid_argument(msg)); + } + + let keys: Vec<_> = keys + .into_iter() + .map(|KeyWithHeight { key, height }| (key, height)) + .collect(); + + let ScanServiceResponse::RegisteredKeys(_) = self + .scan_service + .clone() + .ready() + .and_then(|service| service.call(ScanServiceRequest::RegisterKeys(keys.clone()))) + .await + .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))? + else { + return Err(Status::unknown( + "scan service returned an unexpected response", + )); + }; + + let keys: Vec<_> = keys.into_iter().map(|(key, _start_at)| key).collect(); + + let ScanServiceResponse::Results(results) = self + .scan_service + .clone() + .ready() + .and_then(|service| service.call(ScanServiceRequest::Results(keys.clone()))) + .await + .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))? + else { + return Err(Status::unknown( + "scan service returned an unexpected response", + )); + }; + + let ScanServiceResponse::SubscribeResults(mut results_receiver) = self + .scan_service + .clone() + .ready() + .and_then(|service| { + service.call(ScanServiceRequest::SubscribeResults( + keys.iter().cloned().collect(), + )) + }) + .await + .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))? + else { + return Err(Status::unknown( + "scan service returned an unexpected response", + )); + }; + + let (response_sender, response_receiver) = + tokio::sync::mpsc::channel(SCAN_RESPONDER_BUFFER_SIZE); + let response_stream = ReceiverStream::new(response_receiver); + + tokio::spawn(async move { + let initial_results = process_results(keys, results); + + let send_result = response_sender + .send(Ok(ScanResponse { + results: initial_results, + })) + .await; + + if send_result.is_err() { + // return early if the client has disconnected + return; + } + + while let Some(scan_result) = results_receiver.recv().await { + let send_result = response_sender.send(Ok(scan_result.into())).await; + + // Finish task if the client has disconnected + if send_result.is_err() { + break; + } + } + }); + + Ok(Response::new(Box::pin(response_stream))) + } + + async fn get_info( + &self, + _request: tonic::Request, + ) -> Result, Status> { let ScanServiceResponse::Info { min_sapling_birthday_height, } = self @@ -170,34 +275,74 @@ where )); }; - // If there are no results for a key, we still want to return it with empty results. - let empty_map = BTreeMap::new(); - - let results = keys - .into_iter() - .map(|key| { - let values = response.get(&key).unwrap_or(&empty_map); - - // Skip heights with no transactions, they are scanner markers and should not be returned. - let transactions = Results { - transactions: values - .iter() - .filter(|(_, transactions)| !transactions.is_empty()) - .map(|(height, transactions)| { - let txs = transactions.iter().map(ToString::to_string).collect(); - (height.0, TransactionHash { hash: txs }) - }) - .collect(), - }; - - (key, transactions) - }) - .collect::>(); + let results = process_results(keys, response); Ok(Response::new(GetResultsResponse { results })) } } +fn process_results( + keys: Vec, + results: BTreeMap>>, +) -> BTreeMap { + // If there are no results for a key, we still want to return it with empty results. + let empty_map = BTreeMap::new(); + + keys.into_iter() + .map(|key| { + let values = results.get(&key).unwrap_or(&empty_map); + + // Skip heights with no transactions, they are scanner markers and should not be returned. + let transactions = Results { + by_height: values + .iter() + .filter(|(_, transactions)| !transactions.is_empty()) + .map(|(height, transactions)| { + let transactions = transactions + .iter() + .map(ToString::to_string) + .map(|hash| Transaction { hash }) + .collect(); + (height.0, Transactions { transactions }) + }) + .collect(), + }; + + (key, transactions) + }) + .collect::>() +} + +impl From for ScanResponse { + fn from( + ScanResult { + key, + height: Height(height), + tx_id, + }: ScanResult, + ) -> Self { + ScanResponse { + results: [( + key, + Results { + by_height: [( + height, + Transactions { + transactions: [tx_id.to_string()] + .map(|hash| Transaction { hash }) + .to_vec(), + }, + )] + .into_iter() + .collect(), + }, + )] + .into_iter() + .collect(), + } + } +} + /// Initializes the zebra-scan gRPC server pub async fn init( listen_addr: SocketAddr, diff --git a/zebra-grpc/src/tests/snapshot.rs b/zebra-grpc/src/tests/snapshot.rs index 9d392c573..63f355aa9 100644 --- a/zebra-grpc/src/tests/snapshot.rs +++ b/zebra-grpc/src/tests/snapshot.rs @@ -13,12 +13,14 @@ use zebra_chain::{block::Height, parameters::Network, transaction}; use zebra_test::mock_service::MockService; use zebra_node_services::scan_service::{ - request::Request as ScanRequest, response::Response as ScanResponse, + request::Request as ScanRequest, + response::{Response as ScanResponse, ScanResult}, }; use crate::{ scanner::{ - scanner_client::ScannerClient, Empty, GetResultsRequest, GetResultsResponse, InfoReply, + self, scanner_client::ScannerClient, Empty, GetResultsRequest, GetResultsResponse, + InfoReply, KeyWithHeight, }, server::init, }; @@ -138,6 +140,85 @@ async fn test_mocked_rpc_response_data_for_network(network: Network, random_port .expect("get_results request should succeed"); snapshot_rpc_getresults(get_results_response.into_inner(), &settings); + + // snapshot the scan grpc method + + let scan_response_fut = { + let mut client = client.clone(); + let get_results_request = tonic::Request::new(scanner::ScanRequest { + keys: vec![KeyWithHeight { + key: ZECPAGES_SAPLING_VIEWING_KEY.to_string(), + height: None, + }], + }); + tokio::spawn(async move { client.scan(get_results_request).await }) + }; + + let (fake_results_sender, fake_results_receiver) = tokio::sync::mpsc::channel(1); + + { + let mut mock_scan_service = mock_scan_service.clone(); + tokio::spawn(async move { + let zec_pages_sapling_efvk = ZECPAGES_SAPLING_VIEWING_KEY.to_string(); + let mut fake_results = BTreeMap::new(); + for fake_result_height in [Height::MIN, Height(1), Height::MAX] { + fake_results.insert( + fake_result_height, + [transaction::Hash::from([0; 32])].repeat(3), + ); + } + + let mut fake_results_response = BTreeMap::new(); + fake_results_response.insert(zec_pages_sapling_efvk.clone(), fake_results); + + mock_scan_service + .expect_request_that(|req| matches!(req, ScanRequest::RegisterKeys(_))) + .await + .respond(ScanResponse::RegisteredKeys(vec![])); + + mock_scan_service + .expect_request_that(|req| matches!(req, ScanRequest::Results(_))) + .await + .respond(ScanResponse::Results(fake_results_response)); + + mock_scan_service + .expect_request_that(|req| matches!(req, ScanRequest::SubscribeResults(_))) + .await + .respond(ScanResponse::SubscribeResults(fake_results_receiver)); + }); + } + + let scan_response = scan_response_fut + .await + .expect("tokio task should join successfully") + .expect("get_results request should succeed"); + + let mut scan_response_stream = scan_response.into_inner(); + + let scan_response_message = scan_response_stream + .message() + .await + .expect("scan response message should be ok") + .expect("scan response message should be some"); + + snapshot_rpc_scan("cached", scan_response_message, &settings); + + fake_results_sender + .send(ScanResult { + key: ZECPAGES_SAPLING_VIEWING_KEY.to_string(), + height: Height::MIN, + tx_id: transaction::Hash::from([0; 32]), + }) + .await + .expect("should send fake result successfully"); + + let scan_response_message = scan_response_stream + .message() + .await + .expect("scan response message should be ok") + .expect("scan response message should be some"); + + snapshot_rpc_scan("subscribed", scan_response_message, &settings); } /// Snapshot `getinfo` response, using `cargo insta` and JSON serialization. @@ -149,3 +230,12 @@ fn snapshot_rpc_getinfo(info: InfoReply, settings: &insta::Settings) { fn snapshot_rpc_getresults(results: GetResultsResponse, settings: &insta::Settings) { settings.bind(|| insta::assert_json_snapshot!("get_results", results)); } + +/// Snapshot `scan` response, using `cargo insta` and JSON serialization. +fn snapshot_rpc_scan( + variant: &'static str, + scan_response: scanner::ScanResponse, + settings: &insta::Settings, +) { + settings.bind(|| insta::assert_json_snapshot!(format!("scan_{variant}"), scan_response)); +} diff --git a/zebra-grpc/src/tests/snapshots/get_results@mainnet.snap b/zebra-grpc/src/tests/snapshots/get_results@mainnet.snap index afba0d25d..26caa0e85 100644 --- a/zebra-grpc/src/tests/snapshots/get_results@mainnet.snap +++ b/zebra-grpc/src/tests/snapshots/get_results@mainnet.snap @@ -5,26 +5,44 @@ expression: results { "results": { "zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": { - "transactions": { + "by_height": { "0": { - "hash": [ - "0000000000000000000000000000000000000000000000000000000000000000", - "0000000000000000000000000000000000000000000000000000000000000000", - "0000000000000000000000000000000000000000000000000000000000000000" + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } ] }, "1": { - "hash": [ - "0000000000000000000000000000000000000000000000000000000000000000", - "0000000000000000000000000000000000000000000000000000000000000000", - "0000000000000000000000000000000000000000000000000000000000000000" + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } ] }, "2147483647": { - "hash": [ - "0000000000000000000000000000000000000000000000000000000000000000", - "0000000000000000000000000000000000000000000000000000000000000000", - "0000000000000000000000000000000000000000000000000000000000000000" + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } ] } } diff --git a/zebra-grpc/src/tests/snapshots/get_results@testnet.snap b/zebra-grpc/src/tests/snapshots/get_results@testnet.snap index afba0d25d..26caa0e85 100644 --- a/zebra-grpc/src/tests/snapshots/get_results@testnet.snap +++ b/zebra-grpc/src/tests/snapshots/get_results@testnet.snap @@ -5,26 +5,44 @@ expression: results { "results": { "zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": { - "transactions": { + "by_height": { "0": { - "hash": [ - "0000000000000000000000000000000000000000000000000000000000000000", - "0000000000000000000000000000000000000000000000000000000000000000", - "0000000000000000000000000000000000000000000000000000000000000000" + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } ] }, "1": { - "hash": [ - "0000000000000000000000000000000000000000000000000000000000000000", - "0000000000000000000000000000000000000000000000000000000000000000", - "0000000000000000000000000000000000000000000000000000000000000000" + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } ] }, "2147483647": { - "hash": [ - "0000000000000000000000000000000000000000000000000000000000000000", - "0000000000000000000000000000000000000000000000000000000000000000", - "0000000000000000000000000000000000000000000000000000000000000000" + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } ] } } diff --git a/zebra-grpc/src/tests/snapshots/scan_cached@mainnet.snap b/zebra-grpc/src/tests/snapshots/scan_cached@mainnet.snap new file mode 100644 index 000000000..9187f5828 --- /dev/null +++ b/zebra-grpc/src/tests/snapshots/scan_cached@mainnet.snap @@ -0,0 +1,51 @@ +--- +source: zebra-grpc/src/tests/snapshot.rs +expression: scan_response +--- +{ + "results": { + "zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": { + "by_height": { + "0": { + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } + ] + }, + "1": { + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } + ] + }, + "2147483647": { + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } + ] + } + } + } + } +} diff --git a/zebra-grpc/src/tests/snapshots/scan_cached@testnet.snap b/zebra-grpc/src/tests/snapshots/scan_cached@testnet.snap new file mode 100644 index 000000000..9187f5828 --- /dev/null +++ b/zebra-grpc/src/tests/snapshots/scan_cached@testnet.snap @@ -0,0 +1,51 @@ +--- +source: zebra-grpc/src/tests/snapshot.rs +expression: scan_response +--- +{ + "results": { + "zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": { + "by_height": { + "0": { + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } + ] + }, + "1": { + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } + ] + }, + "2147483647": { + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + }, + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } + ] + } + } + } + } +} diff --git a/zebra-grpc/src/tests/snapshots/scan_subscribed@mainnet.snap b/zebra-grpc/src/tests/snapshots/scan_subscribed@mainnet.snap new file mode 100644 index 000000000..bb964432d --- /dev/null +++ b/zebra-grpc/src/tests/snapshots/scan_subscribed@mainnet.snap @@ -0,0 +1,19 @@ +--- +source: zebra-grpc/src/tests/snapshot.rs +expression: scan_response +--- +{ + "results": { + "zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": { + "by_height": { + "0": { + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } + ] + } + } + } + } +} diff --git a/zebra-grpc/src/tests/snapshots/scan_subscribed@testnet.snap b/zebra-grpc/src/tests/snapshots/scan_subscribed@testnet.snap new file mode 100644 index 000000000..bb964432d --- /dev/null +++ b/zebra-grpc/src/tests/snapshots/scan_subscribed@testnet.snap @@ -0,0 +1,19 @@ +--- +source: zebra-grpc/src/tests/snapshot.rs +expression: scan_response +--- +{ + "results": { + "zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz": { + "by_height": { + "0": { + "transactions": [ + { + "hash": "0000000000000000000000000000000000000000000000000000000000000000" + } + ] + } + } + } + } +}