From 14463a7f5d80329b45dbd730d94d53a6f6037475 Mon Sep 17 00:00:00 2001 From: Arya Date: Tue, 16 Jul 2024 08:25:31 -0400 Subject: [PATCH] add(rpc): Add a tonic server in zebra-rpc (#8674) * adds a tonic server * Adds a test stub, moves method impls to their own modules, minor fixes. * Moves indexer rpc mod behind a feature, adds a config field for its listen address, and initializes the indexer RPC when zebrad starts * Skips tonic_build() in zebra-rpc build script unless indexer-rpcs feature is selected, simplifies indexer.proto file, makes tonic deps optional * formats zebra-rpc Cargo.toml * Adds tokio_stream dependency, adds chain_tip_change field to IndexerRPC, and implements a simple version of the chain_tip_change RPC method * passes latest chain tip to indexer::server::init from start cmd and updates vectors test * Update zebra-rpc/src/config.rs * fixes a race condition in trusted_chain_sync_handles_forks_correctly --- Cargo.lock | 5 ++ zebra-rpc/Cargo.toml | 61 +++++++++++++++++--- zebra-rpc/build.rs | 15 +++++ zebra-rpc/proto/indexer.proto | 10 ++++ zebra-rpc/src/config.rs | 19 +++++++ zebra-rpc/src/indexer.rs | 13 +++++ zebra-rpc/src/indexer/methods.rs | 57 +++++++++++++++++++ zebra-rpc/src/indexer/server.rs | 77 ++++++++++++++++++++++++++ zebra-rpc/src/indexer/tests.rs | 1 + zebra-rpc/src/indexer/tests/vectors.rs | 76 +++++++++++++++++++++++++ zebra-rpc/src/lib.rs | 3 + zebra-rpc/src/server/tests/vectors.rs | 4 ++ zebrad/Cargo.toml | 3 + zebrad/src/commands/start.rs | 33 +++++++++++ zebrad/tests/acceptance.rs | 14 +---- 15 files changed, 371 insertions(+), 20 deletions(-) create mode 100644 zebra-rpc/build.rs create mode 100644 zebra-rpc/proto/indexer.proto create mode 100644 zebra-rpc/src/indexer.rs create mode 100644 zebra-rpc/src/indexer/methods.rs create mode 100644 zebra-rpc/src/indexer/server.rs create mode 100644 zebra-rpc/src/indexer/tests.rs create mode 100644 zebra-rpc/src/indexer/tests/vectors.rs diff --git a/Cargo.lock b/Cargo.lock index dc3c4cbed..15ea0bdad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5988,11 +5988,16 @@ dependencies = [ "jsonrpc-derive", "jsonrpc-http-server", "proptest", + "prost", "rand 0.8.5", "serde", "serde_json", "thiserror", "tokio", + "tokio-stream", + "tonic 0.11.0", + "tonic-build 0.11.0", + "tonic-reflection", "tower", "tracing", "zcash_address", diff --git a/zebra-rpc/Cargo.toml b/zebra-rpc/Cargo.toml index 33a733ee4..70ed43760 100644 --- a/zebra-rpc/Cargo.toml +++ b/zebra-rpc/Cargo.toml @@ -12,10 +12,23 @@ homepage = "https://zfnd.org/zebra/" # crates.io is limited to 5 keywords and categories keywords = ["zebra", "zcash"] # Must be one of -categories = ["asynchronous", "cryptography::cryptocurrencies", "encoding", "network-programming"] +categories = [ + "asynchronous", + "cryptography::cryptocurrencies", + "encoding", + "network-programming", +] [features] +indexer-rpcs = [ + "tonic-build", + "tonic", + "tonic-reflection", + "prost", + "tokio-stream", +] + # Production features that activate extra dependencies, or extra features in dependencies # Mining RPC support @@ -41,7 +54,10 @@ proptest-impl = [ ] [dependencies] -chrono = { version = "0.4.38", default-features = false, features = ["clock", "std"] } +chrono = { version = "0.4.38", default-features = false, features = [ + "clock", + "std", +] } futures = "0.3.30" # lightwalletd sends JSON-RPC requests over HTTP 1.1 @@ -55,14 +71,26 @@ jsonrpc-http-server = "18.0.0" serde_json = { version = "1.0.120", features = ["preserve_order"] } indexmap = { version = "2.2.6", features = ["serde"] } -tokio = { version = "1.37.0", features = ["time", "rt-multi-thread", "macros", "tracing"] } +tokio = { version = "1.37.0", features = [ + "time", + "rt-multi-thread", + "macros", + "tracing", +] } tower = "0.4.13" +# indexer-rpcs dependencies +tonic = { version = "0.11.0", optional = true } +tonic-reflection = { version = "0.11.0", optional = true } +prost = { version = "0.12.6", optional = true } +tokio-stream = { version = "0.1.15", optional = true } + tracing = "0.1.39" hex = { version = "0.4.3", features = ["serde"] } serde = { version = "1.0.204", features = ["serde_derive"] } + zcash_primitives = { version = "0.15.0" } # Experimental feature getblocktemplate-rpcs @@ -73,13 +101,20 @@ zcash_address = { version = "0.3.2", optional = true } # Test-only feature proptest-impl proptest = { version = "1.4.0", optional = true } -zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = ["json-conversion"] } +zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = [ + "json-conversion", +] } zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38" } zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38" } -zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = ["rpc-client"] } +zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = [ + "rpc-client", +] } zebra-script = { path = "../zebra-script", version = "1.0.0-beta.38" } zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38" } +[build-dependencies] +tonic-build = { version = "0.11.0", optional = true } + [dev-dependencies] insta = { version = "1.39.0", features = ["redactions", "json", "ron"] } @@ -88,9 +123,17 @@ proptest = "1.4.0" thiserror = "1.0.61" tokio = { version = "1.37.0", features = ["full", "tracing", "test-util"] } -zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = ["proptest-impl"] } -zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38", features = ["proptest-impl"] } -zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38", features = ["proptest-impl"] } -zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38", features = ["proptest-impl"] } +zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = [ + "proptest-impl", +] } +zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38", features = [ + "proptest-impl", +] } +zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38", features = [ + "proptest-impl", +] } +zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38", features = [ + "proptest-impl", +] } zebra-test = { path = "../zebra-test", version = "1.0.0-beta.38" } diff --git a/zebra-rpc/build.rs b/zebra-rpc/build.rs new file mode 100644 index 000000000..75db7fd2a --- /dev/null +++ b/zebra-rpc/build.rs @@ -0,0 +1,15 @@ +//! Compile proto files + +fn main() -> Result<(), Box> { + #[cfg(feature = "indexer-rpcs")] + { + use std::{env, path::PathBuf}; + let out_dir = env::var("OUT_DIR").map(PathBuf::from); + tonic_build::configure() + .type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]") + .file_descriptor_set_path(out_dir.unwrap().join("indexer_descriptor.bin")) + .compile(&["proto/indexer.proto"], &[""])?; + } + + Ok(()) +} diff --git a/zebra-rpc/proto/indexer.proto b/zebra-rpc/proto/indexer.proto new file mode 100644 index 000000000..6ce5911ba --- /dev/null +++ b/zebra-rpc/proto/indexer.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; +package zebra.indexer.rpc; + +// Used by methods that take no arguments. +message Empty {}; + +service Indexer { + // Notifies listeners of chain tip changes + rpc ChainTipChange(Empty) returns (stream Empty); +} \ No newline at end of file diff --git a/zebra-rpc/src/config.rs b/zebra-rpc/src/config.rs index 3f74ead07..8dc675b20 100644 --- a/zebra-rpc/src/config.rs +++ b/zebra-rpc/src/config.rs @@ -30,6 +30,22 @@ pub struct Config { /// They can also query your node's state. pub listen_addr: Option, + /// IP address and port for the indexer RPC server. + /// + /// Note: The indexer RPC server is disabled by default. + /// To enable the indexer RPC server, compile `zebrad` with the + /// `indexer` feature flag and set a listen address in the config: + /// ```toml + /// [rpc] + /// indexer_listen_addr = '127.0.0.1:8230' + /// ``` + /// + /// # Security + /// + /// If you bind Zebra's indexer RPC port to a public IP address, + /// anyone on the internet can query your node's state. + pub indexer_listen_addr: Option, + /// The number of threads used to process RPC requests and responses. /// /// Zebra's RPC server has a separate thread pool and a `tokio` executor for each thread. @@ -65,6 +81,9 @@ impl Default for Config { // Disable RPCs by default. listen_addr: None, + // Disable indexer RPCs by default. + indexer_listen_addr: None, + // Use a single thread, so we can detect RPC port conflicts. #[cfg(not(feature = "getblocktemplate-rpcs"))] parallel_cpu_threads: 1, diff --git a/zebra-rpc/src/indexer.rs b/zebra-rpc/src/indexer.rs new file mode 100644 index 000000000..9f4c69312 --- /dev/null +++ b/zebra-rpc/src/indexer.rs @@ -0,0 +1,13 @@ +//! A tonic RPC server for Zebra's indexer API. + +#[cfg(test)] +mod tests; + +pub mod methods; +pub mod server; + +// The generated indexer proto +tonic::include_proto!("zebra.indexer.rpc"); + +pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("indexer_descriptor"); diff --git a/zebra-rpc/src/indexer/methods.rs b/zebra-rpc/src/indexer/methods.rs new file mode 100644 index 000000000..26b3b0ead --- /dev/null +++ b/zebra-rpc/src/indexer/methods.rs @@ -0,0 +1,57 @@ +//! Implements `Indexer` methods on the `IndexerRPC` type + +use std::pin::Pin; + +use futures::Stream; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Response, Status}; +use tower::BoxError; + +use zebra_chain::chain_tip::ChainTip; + +use super::{indexer_server::Indexer, server::IndexerRPC, Empty}; + +/// The maximum number of messages that can be queued to be streamed to a client +const RESPONSE_BUFFER_SIZE: usize = 10_000; + +#[tonic::async_trait] +impl Indexer for IndexerRPC +where + ReadStateService: tower::Service< + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = BoxError, + > + Clone + + Send + + Sync + + 'static, + >::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, +{ + type ChainTipChangeStream = Pin> + Send>>; + + async fn chain_tip_change( + &self, + _: tonic::Request, + ) -> Result, Status> { + let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE); + let response_stream = ReceiverStream::new(response_receiver); + let mut chain_tip_change = self.chain_tip_change.clone(); + + tokio::spawn(async move { + // Notify the client of chain tip changes until the channel is closed + while let Ok(()) = chain_tip_change.best_tip_changed().await { + let tx = response_sender.clone(); + tokio::spawn(async move { tx.send(Ok(Empty {})).await }); + } + + let _ = response_sender + .send(Err(Status::unavailable( + "chain_tip_change channel has closed", + ))) + .await; + }); + + Ok(Response::new(Box::pin(response_stream))) + } +} diff --git a/zebra-rpc/src/indexer/server.rs b/zebra-rpc/src/indexer/server.rs new file mode 100644 index 000000000..fcd3a3ac6 --- /dev/null +++ b/zebra-rpc/src/indexer/server.rs @@ -0,0 +1,77 @@ +//! A tonic RPC server for Zebra's indexer API. + +use std::net::SocketAddr; + +use tokio::task::JoinHandle; +use tonic::transport::{server::TcpIncoming, Server}; +use tower::BoxError; +use zebra_chain::chain_tip::ChainTip; + +use super::indexer_server::IndexerServer; + +type ServerTask = JoinHandle>; + +/// Indexer RPC service. +pub struct IndexerRPC +where + ReadStateService: tower::Service< + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = BoxError, + > + Clone + + Send + + Sync + + 'static, + >::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, +{ + _read_state: ReadStateService, + pub(super) chain_tip_change: Tip, +} + +/// Initializes the indexer RPC server +pub async fn init( + listen_addr: SocketAddr, + _read_state: ReadStateService, + chain_tip_change: Tip, +) -> Result<(ServerTask, SocketAddr), BoxError> +where + ReadStateService: tower::Service< + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = BoxError, + > + Clone + + Send + + Sync + + 'static, + >::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, +{ + let indexer_service = IndexerRPC { + _read_state, + chain_tip_change, + }; + + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(crate::indexer::FILE_DESCRIPTOR_SET) + .build() + .unwrap(); + + tracing::info!("Trying to open indexer RPC endpoint at {}...", listen_addr,); + + let tcp_listener = tokio::net::TcpListener::bind(listen_addr).await?; + let listen_addr = tcp_listener.local_addr()?; + let incoming = TcpIncoming::from_listener(tcp_listener, true, None)?; + + let server_task: JoinHandle> = tokio::spawn(async move { + Server::builder() + .add_service(reflection_service) + .add_service(IndexerServer::new(indexer_service)) + .serve_with_incoming(incoming) + .await?; + + Ok(()) + }); + + Ok((server_task, listen_addr)) +} diff --git a/zebra-rpc/src/indexer/tests.rs b/zebra-rpc/src/indexer/tests.rs new file mode 100644 index 000000000..ceba02b10 --- /dev/null +++ b/zebra-rpc/src/indexer/tests.rs @@ -0,0 +1 @@ +mod vectors; diff --git a/zebra-rpc/src/indexer/tests/vectors.rs b/zebra-rpc/src/indexer/tests/vectors.rs new file mode 100644 index 000000000..2d5f1d8f3 --- /dev/null +++ b/zebra-rpc/src/indexer/tests/vectors.rs @@ -0,0 +1,76 @@ +//! Fixed test vectors for indexer RPCs + +use std::time::Duration; + +use futures::StreamExt; +use tokio::task::JoinHandle; +use tower::BoxError; +use zebra_chain::{ + block::Height, + chain_tip::mock::{MockChainTip, MockChainTipSender}, +}; +use zebra_test::{ + mock_service::MockService, + prelude::color_eyre::{eyre::eyre, Result}, +}; + +use crate::indexer::{self, indexer_client::IndexerClient, Empty}; + +#[tokio::test] +async fn rpc_server_spawn() -> Result<()> { + let _init_guard = zebra_test::init(); + + let (_server_task, client, mock_chain_tip_sender) = start_server_and_get_client().await?; + + test_chain_tip_change(client.clone(), mock_chain_tip_sender).await?; + + Ok(()) +} + +async fn test_chain_tip_change( + mut client: IndexerClient, + mock_chain_tip_sender: MockChainTipSender, +) -> Result<()> { + let request = tonic::Request::new(Empty {}); + let mut response = client.chain_tip_change(request).await?.into_inner(); + mock_chain_tip_sender.send_best_tip_height(Height::MIN); + + // Wait for RPC server to send a message + tokio::time::sleep(Duration::from_millis(500)).await; + + tokio::time::timeout(Duration::from_secs(3), response.next()) + .await + .expect("should receive chain tip change notification before timeout") + .expect("response stream should not be empty") + .expect("chain tip change response should not be an error message"); + + Ok(()) +} + +async fn start_server_and_get_client() -> Result<( + JoinHandle>, + IndexerClient, + MockChainTipSender, +)> { + let listen_addr: std::net::SocketAddr = "127.0.0.1:0" + .parse() + .expect("hard-coded IP and u16 port should parse successfully"); + + let mock_read_service = MockService::build().for_unit_tests(); + let (mock_chain_tip_change, mock_chain_tip_change_sender) = MockChainTip::new(); + + let (server_task, listen_addr) = + indexer::server::init(listen_addr, mock_read_service, mock_chain_tip_change) + .await + .map_err(|err| eyre!(err))?; + + // wait for the server to start + tokio::time::sleep(Duration::from_secs(1)).await; + + // connect to the gRPC server + let client = IndexerClient::connect(format!("http://{listen_addr}")) + .await + .expect("server should receive connection"); + + Ok((server_task, client, mock_chain_tip_change_sender)) +} diff --git a/zebra-rpc/src/lib.rs b/zebra-rpc/src/lib.rs index d5b687913..778788c9e 100644 --- a/zebra-rpc/src/lib.rs +++ b/zebra-rpc/src/lib.rs @@ -11,5 +11,8 @@ pub mod queue; pub mod server; pub mod sync; +#[cfg(feature = "indexer-rpcs")] +pub mod indexer; + #[cfg(test)] mod tests; diff --git a/zebra-rpc/src/server/tests/vectors.rs b/zebra-rpc/src/server/tests/vectors.rs index 78b7bd815..9c50ecc7c 100644 --- a/zebra-rpc/src/server/tests/vectors.rs +++ b/zebra-rpc/src/server/tests/vectors.rs @@ -43,6 +43,7 @@ fn rpc_server_spawn(parallel_cpu_threads: bool) { let config = Config { listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0).into()), + indexer_listen_addr: None, parallel_cpu_threads: if parallel_cpu_threads { 2 } else { 1 }, debug_force_finished_sync: false, }; @@ -130,6 +131,7 @@ fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool, do_shutdown: bo #[allow(clippy::bool_to_int_with_if)] let config = Config { listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), + indexer_listen_addr: None, parallel_cpu_threads: if parallel_cpu_threads { 0 } else { 1 }, debug_force_finished_sync: false, }; @@ -210,6 +212,7 @@ fn rpc_server_spawn_port_conflict() { let port = zebra_test::net::random_known_port(); let config = Config { listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), + indexer_listen_addr: None, parallel_cpu_threads: 1, debug_force_finished_sync: false, }; @@ -320,6 +323,7 @@ fn rpc_server_spawn_port_conflict_parallel_auto() { let port = zebra_test::net::random_known_port(); let config = Config { listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), + indexer_listen_addr: None, parallel_cpu_threads: 2, debug_force_finished_sync: false, }; diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index a49bba996..548b12fd5 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -59,6 +59,9 @@ default-release-binaries = ["default", "sentry"] # Production features that activate extra dependencies, or extra features in dependencies +# Indexer RPC support +indexer-rpcs = ["zebra-rpc/indexer-rpcs"] + # Mining RPC support getblocktemplate-rpcs = [ "zebra-rpc/getblocktemplate-rpcs", diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 50d083d52..887f1cc02 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -252,6 +252,31 @@ impl StartCmd { config.network.network.clone(), ); + // TODO: Add a shutdown signal and start the server with `serve_with_incoming_shutdown()` if + // any related unit tests sometimes crash with memory errors + #[cfg(feature = "indexer-rpcs")] + let indexer_rpc_task_handle = + if let Some(indexer_listen_addr) = config.rpc.indexer_listen_addr { + info!("spawning indexer RPC server"); + let (indexer_rpc_task_handle, _listen_addr) = zebra_rpc::indexer::server::init( + indexer_listen_addr, + read_only_state_service.clone(), + latest_chain_tip.clone(), + ) + .await + .map_err(|err| eyre!(err))?; + + indexer_rpc_task_handle + } else { + warn!("configure an indexer_listen_addr to start the indexer RPC server"); + tokio::spawn(std::future::pending().in_current_span()) + }; + + #[cfg(not(feature = "indexer-rpcs"))] + // Spawn a dummy indexer rpc task which doesn't do anything and never finishes. + let indexer_rpc_task_handle: tokio::task::JoinHandle> = + tokio::spawn(std::future::pending().in_current_span()); + // Start concurrent tasks which don't add load to other tasks info!("spawning block gossip task"); let block_gossip_task_handle = tokio::spawn( @@ -367,6 +392,7 @@ impl StartCmd { // ongoing tasks pin!(rpc_task_handle); + pin!(indexer_rpc_task_handle); pin!(rpc_tx_queue_task_handle); pin!(syncer_task_handle); pin!(block_gossip_task_handle); @@ -400,6 +426,13 @@ impl StartCmd { Ok(()) } + indexer_rpc_join_result = &mut indexer_rpc_task_handle => { + let indexer_rpc_server_result = indexer_rpc_join_result + .expect("unexpected panic in the rpc task"); + info!(?indexer_rpc_server_result, "indexer rpc task exited"); + Ok(()) + } + rpc_tx_queue_result = &mut rpc_tx_queue_task_handle => { rpc_tx_queue_result .expect("unexpected panic in the rpc transaction queue task"); diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 810cf100f..6a287ae31 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -2963,21 +2963,17 @@ async fn trusted_chain_sync_handles_forks_correctly() -> Result<()> { use zebra_state::{ReadResponse, Response}; let _init_guard = zebra_test::init(); - let mut config = random_known_rpc_port_config(false, &Network::new_regtest(None))?; + let mut config = os_assigned_rpc_port_config(false, &Network::new_regtest(None))?; config.state.ephemeral = false; let network = config.network.network.clone(); - let rpc_address = config.rpc.listen_addr.unwrap(); let test_dir = testdir()?.with_config(&mut config)?; let mut child = test_dir.spawn_child(args!["start"])?; + let rpc_address = read_listen_addr_from_logs(&mut child, OPENED_RPC_ENDPOINT_MSG)?; tracing::info!("waiting for Zebra state cache to be opened"); - #[cfg(not(target_os = "windows"))] - child.expect_stdout_line_matches("marked database format as newly created")?; - - #[cfg(target_os = "windows")] tokio::time::sleep(LAUNCH_DELAY).await; tracing::info!("starting read state with syncer"); @@ -3185,14 +3181,10 @@ async fn trusted_chain_sync_handles_forks_correctly() -> Result<()> { let test_dir = testdir()?.with_config(&mut config)?; - let mut child = test_dir.spawn_child(args!["start"])?; + let _child = test_dir.spawn_child(args!["start"])?; tracing::info!("waiting for Zebra state cache to be opened"); - #[cfg(not(target_os = "windows"))] - child.expect_stdout_line_matches("marked database format as newly created")?; - - #[cfg(target_os = "windows")] tokio::time::sleep(LAUNCH_DELAY).await; tracing::info!("starting read state with syncer");