add(rpc): Add a tonic server in zebra-rpc (#8674)

* adds a tonic server

* Adds a test stub, moves method impls to their own modules, minor fixes.

* Moves indexer rpc mod behind a feature, adds a config field for its listen address, and initializes the indexer RPC when zebrad starts

* Skips tonic_build() in zebra-rpc build script unless indexer-rpcs feature is selected, simplifies indexer.proto file, makes tonic deps optional

* formats zebra-rpc Cargo.toml

* Adds tokio_stream dependency, adds chain_tip_change field to IndexerRPC, and implements a simple version of the chain_tip_change RPC method

* passes latest chain tip to indexer::server::init from start cmd and  updates vectors test

* Update zebra-rpc/src/config.rs

* fixes a race condition in trusted_chain_sync_handles_forks_correctly
This commit is contained in:
Arya 2024-07-16 08:25:31 -04:00 committed by GitHub
parent 1238ec0c63
commit 14463a7f5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 371 additions and 20 deletions

View File

@ -5988,11 +5988,16 @@ dependencies = [
"jsonrpc-derive",
"jsonrpc-http-server",
"proptest",
"prost",
"rand 0.8.5",
"serde",
"serde_json",
"thiserror",
"tokio",
"tokio-stream",
"tonic 0.11.0",
"tonic-build 0.11.0",
"tonic-reflection",
"tower",
"tracing",
"zcash_address",

View File

@ -12,10 +12,23 @@ homepage = "https://zfnd.org/zebra/"
# crates.io is limited to 5 keywords and categories
keywords = ["zebra", "zcash"]
# Must be one of <https://crates.io/category_slugs>
categories = ["asynchronous", "cryptography::cryptocurrencies", "encoding", "network-programming"]
categories = [
"asynchronous",
"cryptography::cryptocurrencies",
"encoding",
"network-programming",
]
[features]
indexer-rpcs = [
"tonic-build",
"tonic",
"tonic-reflection",
"prost",
"tokio-stream",
]
# Production features that activate extra dependencies, or extra features in dependencies
# Mining RPC support
@ -41,7 +54,10 @@ proptest-impl = [
]
[dependencies]
chrono = { version = "0.4.38", default-features = false, features = ["clock", "std"] }
chrono = { version = "0.4.38", default-features = false, features = [
"clock",
"std",
] }
futures = "0.3.30"
# lightwalletd sends JSON-RPC requests over HTTP 1.1
@ -55,14 +71,26 @@ jsonrpc-http-server = "18.0.0"
serde_json = { version = "1.0.120", features = ["preserve_order"] }
indexmap = { version = "2.2.6", features = ["serde"] }
tokio = { version = "1.37.0", features = ["time", "rt-multi-thread", "macros", "tracing"] }
tokio = { version = "1.37.0", features = [
"time",
"rt-multi-thread",
"macros",
"tracing",
] }
tower = "0.4.13"
# indexer-rpcs dependencies
tonic = { version = "0.11.0", optional = true }
tonic-reflection = { version = "0.11.0", optional = true }
prost = { version = "0.12.6", optional = true }
tokio-stream = { version = "0.1.15", optional = true }
tracing = "0.1.39"
hex = { version = "0.4.3", features = ["serde"] }
serde = { version = "1.0.204", features = ["serde_derive"] }
zcash_primitives = { version = "0.15.0" }
# Experimental feature getblocktemplate-rpcs
@ -73,13 +101,20 @@ zcash_address = { version = "0.3.2", optional = true }
# Test-only feature proptest-impl
proptest = { version = "1.4.0", optional = true }
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = ["json-conversion"] }
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = [
"json-conversion",
] }
zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38" }
zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38" }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = ["rpc-client"] }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = [
"rpc-client",
] }
zebra-script = { path = "../zebra-script", version = "1.0.0-beta.38" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38" }
[build-dependencies]
tonic-build = { version = "0.11.0", optional = true }
[dev-dependencies]
insta = { version = "1.39.0", features = ["redactions", "json", "ron"] }
@ -88,9 +123,17 @@ proptest = "1.4.0"
thiserror = "1.0.61"
tokio = { version = "1.37.0", features = ["full", "tracing", "test-util"] }
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = [
"proptest-impl",
] }
zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38", features = [
"proptest-impl",
] }
zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38", features = [
"proptest-impl",
] }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38", features = [
"proptest-impl",
] }
zebra-test = { path = "../zebra-test", version = "1.0.0-beta.38" }

15
zebra-rpc/build.rs Normal file
View File

@ -0,0 +1,15 @@
//! Compile proto files
fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(feature = "indexer-rpcs")]
{
use std::{env, path::PathBuf};
let out_dir = env::var("OUT_DIR").map(PathBuf::from);
tonic_build::configure()
.type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]")
.file_descriptor_set_path(out_dir.unwrap().join("indexer_descriptor.bin"))
.compile(&["proto/indexer.proto"], &[""])?;
}
Ok(())
}

View File

@ -0,0 +1,10 @@
syntax = "proto3";
package zebra.indexer.rpc;
// Used by methods that take no arguments.
message Empty {};
service Indexer {
// Notifies listeners of chain tip changes
rpc ChainTipChange(Empty) returns (stream Empty);
}

View File

@ -30,6 +30,22 @@ pub struct Config {
/// They can also query your node's state.
pub listen_addr: Option<SocketAddr>,
/// IP address and port for the indexer RPC server.
///
/// Note: The indexer RPC server is disabled by default.
/// To enable the indexer RPC server, compile `zebrad` with the
/// `indexer` feature flag and set a listen address in the config:
/// ```toml
/// [rpc]
/// indexer_listen_addr = '127.0.0.1:8230'
/// ```
///
/// # Security
///
/// If you bind Zebra's indexer RPC port to a public IP address,
/// anyone on the internet can query your node's state.
pub indexer_listen_addr: Option<SocketAddr>,
/// The number of threads used to process RPC requests and responses.
///
/// Zebra's RPC server has a separate thread pool and a `tokio` executor for each thread.
@ -65,6 +81,9 @@ impl Default for Config {
// Disable RPCs by default.
listen_addr: None,
// Disable indexer RPCs by default.
indexer_listen_addr: None,
// Use a single thread, so we can detect RPC port conflicts.
#[cfg(not(feature = "getblocktemplate-rpcs"))]
parallel_cpu_threads: 1,

13
zebra-rpc/src/indexer.rs Normal file
View File

@ -0,0 +1,13 @@
//! A tonic RPC server for Zebra's indexer API.
#[cfg(test)]
mod tests;
pub mod methods;
pub mod server;
// The generated indexer proto
tonic::include_proto!("zebra.indexer.rpc");
pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("indexer_descriptor");

View File

@ -0,0 +1,57 @@
//! Implements `Indexer` methods on the `IndexerRPC` type
use std::pin::Pin;
use futures::Stream;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Response, Status};
use tower::BoxError;
use zebra_chain::chain_tip::ChainTip;
use super::{indexer_server::Indexer, server::IndexerRPC, Empty};
/// The maximum number of messages that can be queued to be streamed to a client
const RESPONSE_BUFFER_SIZE: usize = 10_000;
#[tonic::async_trait]
impl<ReadStateService, Tip> Indexer for IndexerRPC<ReadStateService, Tip>
where
ReadStateService: tower::Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
type ChainTipChangeStream = Pin<Box<dyn Stream<Item = Result<Empty, Status>> + Send>>;
async fn chain_tip_change(
&self,
_: tonic::Request<Empty>,
) -> Result<Response<Self::ChainTipChangeStream>, Status> {
let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
let response_stream = ReceiverStream::new(response_receiver);
let mut chain_tip_change = self.chain_tip_change.clone();
tokio::spawn(async move {
// Notify the client of chain tip changes until the channel is closed
while let Ok(()) = chain_tip_change.best_tip_changed().await {
let tx = response_sender.clone();
tokio::spawn(async move { tx.send(Ok(Empty {})).await });
}
let _ = response_sender
.send(Err(Status::unavailable(
"chain_tip_change channel has closed",
)))
.await;
});
Ok(Response::new(Box::pin(response_stream)))
}
}

View File

@ -0,0 +1,77 @@
//! A tonic RPC server for Zebra's indexer API.
use std::net::SocketAddr;
use tokio::task::JoinHandle;
use tonic::transport::{server::TcpIncoming, Server};
use tower::BoxError;
use zebra_chain::chain_tip::ChainTip;
use super::indexer_server::IndexerServer;
type ServerTask = JoinHandle<Result<(), BoxError>>;
/// Indexer RPC service.
pub struct IndexerRPC<ReadStateService, Tip>
where
ReadStateService: tower::Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
_read_state: ReadStateService,
pub(super) chain_tip_change: Tip,
}
/// Initializes the indexer RPC server
pub async fn init<ReadStateService, Tip>(
listen_addr: SocketAddr,
_read_state: ReadStateService,
chain_tip_change: Tip,
) -> Result<(ServerTask, SocketAddr), BoxError>
where
ReadStateService: tower::Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
let indexer_service = IndexerRPC {
_read_state,
chain_tip_change,
};
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(crate::indexer::FILE_DESCRIPTOR_SET)
.build()
.unwrap();
tracing::info!("Trying to open indexer RPC endpoint at {}...", listen_addr,);
let tcp_listener = tokio::net::TcpListener::bind(listen_addr).await?;
let listen_addr = tcp_listener.local_addr()?;
let incoming = TcpIncoming::from_listener(tcp_listener, true, None)?;
let server_task: JoinHandle<Result<(), BoxError>> = tokio::spawn(async move {
Server::builder()
.add_service(reflection_service)
.add_service(IndexerServer::new(indexer_service))
.serve_with_incoming(incoming)
.await?;
Ok(())
});
Ok((server_task, listen_addr))
}

View File

@ -0,0 +1 @@
mod vectors;

View File

@ -0,0 +1,76 @@
//! Fixed test vectors for indexer RPCs
use std::time::Duration;
use futures::StreamExt;
use tokio::task::JoinHandle;
use tower::BoxError;
use zebra_chain::{
block::Height,
chain_tip::mock::{MockChainTip, MockChainTipSender},
};
use zebra_test::{
mock_service::MockService,
prelude::color_eyre::{eyre::eyre, Result},
};
use crate::indexer::{self, indexer_client::IndexerClient, Empty};
#[tokio::test]
async fn rpc_server_spawn() -> Result<()> {
let _init_guard = zebra_test::init();
let (_server_task, client, mock_chain_tip_sender) = start_server_and_get_client().await?;
test_chain_tip_change(client.clone(), mock_chain_tip_sender).await?;
Ok(())
}
async fn test_chain_tip_change(
mut client: IndexerClient<tonic::transport::Channel>,
mock_chain_tip_sender: MockChainTipSender,
) -> Result<()> {
let request = tonic::Request::new(Empty {});
let mut response = client.chain_tip_change(request).await?.into_inner();
mock_chain_tip_sender.send_best_tip_height(Height::MIN);
// Wait for RPC server to send a message
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::timeout(Duration::from_secs(3), response.next())
.await
.expect("should receive chain tip change notification before timeout")
.expect("response stream should not be empty")
.expect("chain tip change response should not be an error message");
Ok(())
}
async fn start_server_and_get_client() -> Result<(
JoinHandle<Result<(), BoxError>>,
IndexerClient<tonic::transport::Channel>,
MockChainTipSender,
)> {
let listen_addr: std::net::SocketAddr = "127.0.0.1:0"
.parse()
.expect("hard-coded IP and u16 port should parse successfully");
let mock_read_service = MockService::build().for_unit_tests();
let (mock_chain_tip_change, mock_chain_tip_change_sender) = MockChainTip::new();
let (server_task, listen_addr) =
indexer::server::init(listen_addr, mock_read_service, mock_chain_tip_change)
.await
.map_err(|err| eyre!(err))?;
// wait for the server to start
tokio::time::sleep(Duration::from_secs(1)).await;
// connect to the gRPC server
let client = IndexerClient::connect(format!("http://{listen_addr}"))
.await
.expect("server should receive connection");
Ok((server_task, client, mock_chain_tip_change_sender))
}

View File

@ -11,5 +11,8 @@ pub mod queue;
pub mod server;
pub mod sync;
#[cfg(feature = "indexer-rpcs")]
pub mod indexer;
#[cfg(test)]
mod tests;

View File

@ -43,6 +43,7 @@ fn rpc_server_spawn(parallel_cpu_threads: bool) {
let config = Config {
listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0).into()),
indexer_listen_addr: None,
parallel_cpu_threads: if parallel_cpu_threads { 2 } else { 1 },
debug_force_finished_sync: false,
};
@ -130,6 +131,7 @@ fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool, do_shutdown: bo
#[allow(clippy::bool_to_int_with_if)]
let config = Config {
listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()),
indexer_listen_addr: None,
parallel_cpu_threads: if parallel_cpu_threads { 0 } else { 1 },
debug_force_finished_sync: false,
};
@ -210,6 +212,7 @@ fn rpc_server_spawn_port_conflict() {
let port = zebra_test::net::random_known_port();
let config = Config {
listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()),
indexer_listen_addr: None,
parallel_cpu_threads: 1,
debug_force_finished_sync: false,
};
@ -320,6 +323,7 @@ fn rpc_server_spawn_port_conflict_parallel_auto() {
let port = zebra_test::net::random_known_port();
let config = Config {
listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()),
indexer_listen_addr: None,
parallel_cpu_threads: 2,
debug_force_finished_sync: false,
};

View File

@ -59,6 +59,9 @@ default-release-binaries = ["default", "sentry"]
# Production features that activate extra dependencies, or extra features in dependencies
# Indexer RPC support
indexer-rpcs = ["zebra-rpc/indexer-rpcs"]
# Mining RPC support
getblocktemplate-rpcs = [
"zebra-rpc/getblocktemplate-rpcs",

View File

@ -252,6 +252,31 @@ impl StartCmd {
config.network.network.clone(),
);
// TODO: Add a shutdown signal and start the server with `serve_with_incoming_shutdown()` if
// any related unit tests sometimes crash with memory errors
#[cfg(feature = "indexer-rpcs")]
let indexer_rpc_task_handle =
if let Some(indexer_listen_addr) = config.rpc.indexer_listen_addr {
info!("spawning indexer RPC server");
let (indexer_rpc_task_handle, _listen_addr) = zebra_rpc::indexer::server::init(
indexer_listen_addr,
read_only_state_service.clone(),
latest_chain_tip.clone(),
)
.await
.map_err(|err| eyre!(err))?;
indexer_rpc_task_handle
} else {
warn!("configure an indexer_listen_addr to start the indexer RPC server");
tokio::spawn(std::future::pending().in_current_span())
};
#[cfg(not(feature = "indexer-rpcs"))]
// Spawn a dummy indexer rpc task which doesn't do anything and never finishes.
let indexer_rpc_task_handle: tokio::task::JoinHandle<Result<(), tower::BoxError>> =
tokio::spawn(std::future::pending().in_current_span());
// Start concurrent tasks which don't add load to other tasks
info!("spawning block gossip task");
let block_gossip_task_handle = tokio::spawn(
@ -367,6 +392,7 @@ impl StartCmd {
// ongoing tasks
pin!(rpc_task_handle);
pin!(indexer_rpc_task_handle);
pin!(rpc_tx_queue_task_handle);
pin!(syncer_task_handle);
pin!(block_gossip_task_handle);
@ -400,6 +426,13 @@ impl StartCmd {
Ok(())
}
indexer_rpc_join_result = &mut indexer_rpc_task_handle => {
let indexer_rpc_server_result = indexer_rpc_join_result
.expect("unexpected panic in the rpc task");
info!(?indexer_rpc_server_result, "indexer rpc task exited");
Ok(())
}
rpc_tx_queue_result = &mut rpc_tx_queue_task_handle => {
rpc_tx_queue_result
.expect("unexpected panic in the rpc transaction queue task");

View File

@ -2963,21 +2963,17 @@ async fn trusted_chain_sync_handles_forks_correctly() -> Result<()> {
use zebra_state::{ReadResponse, Response};
let _init_guard = zebra_test::init();
let mut config = random_known_rpc_port_config(false, &Network::new_regtest(None))?;
let mut config = os_assigned_rpc_port_config(false, &Network::new_regtest(None))?;
config.state.ephemeral = false;
let network = config.network.network.clone();
let rpc_address = config.rpc.listen_addr.unwrap();
let test_dir = testdir()?.with_config(&mut config)?;
let mut child = test_dir.spawn_child(args!["start"])?;
let rpc_address = read_listen_addr_from_logs(&mut child, OPENED_RPC_ENDPOINT_MSG)?;
tracing::info!("waiting for Zebra state cache to be opened");
#[cfg(not(target_os = "windows"))]
child.expect_stdout_line_matches("marked database format as newly created")?;
#[cfg(target_os = "windows")]
tokio::time::sleep(LAUNCH_DELAY).await;
tracing::info!("starting read state with syncer");
@ -3185,14 +3181,10 @@ async fn trusted_chain_sync_handles_forks_correctly() -> Result<()> {
let test_dir = testdir()?.with_config(&mut config)?;
let mut child = test_dir.spawn_child(args!["start"])?;
let _child = test_dir.spawn_child(args!["start"])?;
tracing::info!("waiting for Zebra state cache to be opened");
#[cfg(not(target_os = "windows"))]
child.expect_stdout_line_matches("marked database format as newly created")?;
#[cfg(target_os = "windows")]
tokio::time::sleep(LAUNCH_DELAY).await;
tracing::info!("starting read state with syncer");