Breaking: fix(rpc): Add a config for multi-threaded RPC server requests (#5013)

* Add a config for increasing the number of RPC threads

* Add unit tests for parallel RPC servers

* Refactor tests to remove duplicate code

* Update the README

* Actually use parallel threads in some RPC tests
This commit is contained in:
teor 2022-09-04 15:03:15 +10:00 committed by GitHub
parent 0edb517c88
commit df9540833d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 232 additions and 20 deletions

1
Cargo.lock generated
View File

@ -6473,6 +6473,7 @@ dependencies = [
"jsonrpc-core", "jsonrpc-core",
"jsonrpc-derive", "jsonrpc-derive",
"jsonrpc-http-server", "jsonrpc-http-server",
"num_cpus",
"proptest", "proptest",
"proptest-derive", "proptest-derive",
"serde", "serde",

View File

@ -13,6 +13,7 @@
- [Beta Releases](#beta-releases) - [Beta Releases](#beta-releases)
- [Getting Started](#getting-started) - [Getting Started](#getting-started)
- [Build and Run Instructions](#build-and-run-instructions) - [Build and Run Instructions](#build-and-run-instructions)
- [Configuring JSON-RPC for lightwalletd](#configuring-json-rpc-for-lightwalletd)
- [Optional Features](#optional-features) - [Optional Features](#optional-features)
- [System Requirements](#system-requirements) - [System Requirements](#system-requirements)
- [Memory Troubleshooting](#memory-troubleshooting) - [Memory Troubleshooting](#memory-troubleshooting)
@ -91,6 +92,22 @@ for your platform:
For more detailed instructions, refer to the [documentation](https://zebra.zfnd.org/user/install.html). For more detailed instructions, refer to the [documentation](https://zebra.zfnd.org/user/install.html).
### Configuring JSON-RPC for lightwalletd
To use `zebrad` as a `lightwalletd` backend, give it this `~/.config/zebrad.toml`:
```toml
[rpc]
# listen for RPC queries on localhost
listen_addr = '127.0.0.1:8232'
# automatically use multiple CPU threads
parallel_cpu_threads = 0
```
**WARNING:** This config allows multiple Zebra instances to share the same RPC port.
See the [RPC config documentation](https://doc.zebra.zfnd.org/zebra_rpc/config/struct.Config.html) for details.
### Optional Features ### Optional Features
For performance reasons, some debugging and monitoring features are disabled in release builds. For performance reasons, some debugging and monitoring features are disabled in release builds.

View File

@ -21,6 +21,8 @@ hyper = { version = "0.14.20", features = ["http1", "server"] }
jsonrpc-core = "18.0.0" jsonrpc-core = "18.0.0"
jsonrpc-derive = "18.0.0" jsonrpc-derive = "18.0.0"
jsonrpc-http-server = "18.0.0" jsonrpc-http-server = "18.0.0"
num_cpus = "1.13.1"
# zebra-rpc needs the preserve_order feature in serde_json, which is a dependency of jsonrpc-core # zebra-rpc needs the preserve_order feature in serde_json, which is a dependency of jsonrpc-core
serde_json = { version = "1.0.85", features = ["preserve_order"] } serde_json = { version = "1.0.85", features = ["preserve_order"] }
indexmap = { version = "1.9.1", features = ["serde"] } indexmap = { version = "1.9.1", features = ["serde"] }

View File

@ -5,7 +5,7 @@ use std::net::SocketAddr;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// RPC configuration section. /// RPC configuration section.
#[derive(Clone, Debug, Default, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)] #[serde(deny_unknown_fields, default)]
pub struct Config { pub struct Config {
/// IP address and port for the RPC server. /// IP address and port for the RPC server.
@ -27,4 +27,38 @@ pub struct Config {
/// anyone on the internet can send transactions via your node. /// anyone on the internet can send transactions via your node.
/// They can also query your node's state. /// They can also query your node's state.
pub listen_addr: Option<SocketAddr>, pub listen_addr: Option<SocketAddr>,
/// The number of threads used to process RPC requests and responses.
///
/// Zebra's RPC server has a separate thread pool and a `tokio` executor for each thread.
/// State queries are run concurrently using the shared thread pool controlled by
/// the [`SyncSection.parallel_cpu_threads`](https://doc.zebra.zfnd.org/zebrad/config/struct.SyncSection.html#structfield.parallel_cpu_threads) config.
///
/// We recommend setting both configs to `0` (automatic scaling) for the best performance.
/// This uses one thread per available CPU core.
///
/// Set to `1` by default, which runs all RPC queries on a single thread, and detects RPC
/// port conflicts from multiple Zebra or `zcashd` instances.
///
/// For details, see [the `jsonrpc_http_server` documentation](https://docs.rs/jsonrpc-http-server/latest/jsonrpc_http_server/struct.ServerBuilder.html#method.threads).
///
/// ## Warning
///
/// Changing this config disables RPC port conflict detection.
/// This can allow multiple Zebra instances to share the same RPC port.
///
/// If some of those instances are outdated or failed, RPC queries can be slow or inconsistent.
pub parallel_cpu_threads: usize,
}
impl Default for Config {
fn default() -> Self {
Self {
// Disable RPCs by default.
listen_addr: None,
// Use a single thread, so we can detect RPC port conflicts.
parallel_cpu_threads: 1,
}
}
} }

View File

@ -73,6 +73,12 @@ impl RpcServer {
MetaIoHandler::new(Compatibility::Both, TracingMiddleware); MetaIoHandler::new(Compatibility::Both, TracingMiddleware);
io.extend_with(rpc_impl.to_delegate()); io.extend_with(rpc_impl.to_delegate());
// If zero, automatically scale threads to the number of CPU cores
let mut parallel_cpu_threads = config.parallel_cpu_threads;
if parallel_cpu_threads == 0 {
parallel_cpu_threads = num_cpus::get();
}
// The server is a blocking task, which blocks on executor shutdown. // The server is a blocking task, which blocks on executor shutdown.
// So we need to create and spawn it on a std::thread, inside a tokio blocking task. // So we need to create and spawn it on a std::thread, inside a tokio blocking task.
// (Otherwise tokio panics when we shut down the RPC server.) // (Otherwise tokio panics when we shut down the RPC server.)
@ -85,12 +91,8 @@ impl RpcServer {
// TODO: // TODO:
// - return server.close_handle(), which can shut down the RPC server, // - return server.close_handle(), which can shut down the RPC server,
// and add it to the server tests // and add it to the server tests
// - allow multiple RPC threads
// (when jsonrpc_http_server has multiple threads,
// it lets any process share its port - do we need to warn users?)
// - make the number of RPC threads configurable
let server = ServerBuilder::new(io) let server = ServerBuilder::new(io)
.threads(1) .threads(parallel_cpu_threads)
// TODO: disable this security check if we see errors from lightwalletd // TODO: disable this security check if we see errors from lightwalletd
//.allowed_hosts(DomainsValidation::Disabled) //.allowed_hosts(DomainsValidation::Disabled)
.request_middleware(FixHttpRequestMiddleware) .request_middleware(FixHttpRequestMiddleware)

View File

@ -15,14 +15,29 @@ use zebra_test::mock_service::MockService;
use super::super::*; use super::super::*;
/// Test if the RPC server will spawn on a randomly generated port. /// Test that the JSON-RPC server spawns when configured with a single thread.
#[test] #[test]
fn rpc_server_spawn() { fn rpc_server_spawn_single_thread() {
rpc_server_spawn(false)
}
/// Test that the JSON-RPC server spawns when configured with multiple threads.
#[test]
fn rpc_sever_spawn_parallel_threads() {
rpc_server_spawn(true)
}
/// Test if the RPC server will spawn on a randomly generated port.
///
/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores.
#[tracing::instrument]
fn rpc_server_spawn(parallel_cpu_threads: bool) {
let _init_guard = zebra_test::init(); let _init_guard = zebra_test::init();
let port = zebra_test::net::random_known_port(); let port = zebra_test::net::random_known_port();
let config = Config { let config = Config {
listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()),
parallel_cpu_threads: if parallel_cpu_threads { 2 } else { 1 },
}; };
let rt = tokio::runtime::Runtime::new().unwrap(); let rt = tokio::runtime::Runtime::new().unwrap();
@ -62,14 +77,31 @@ fn rpc_server_spawn() {
rt.shutdown_timeout(Duration::from_secs(1)); rt.shutdown_timeout(Duration::from_secs(1));
} }
/// Test if the RPC server will spawn on an OS-assigned unallocated port. /// Test that the JSON-RPC server spawns when configured with a single thread,
/// on an OS-assigned unallocated port.
#[test] #[test]
fn rpc_server_spawn_unallocated_port() { fn rpc_server_spawn_unallocated_port_single_thread() {
rpc_server_spawn_unallocated_port(false)
}
/// Test that the JSON-RPC server spawn when configured with multiple threads,
/// on an OS-assigned unallocated port.
#[test]
fn rpc_sever_spawn_unallocated_port_parallel_threads() {
rpc_server_spawn_unallocated_port(true)
}
/// Test if the RPC server will spawn on an OS-assigned unallocated port.
///
/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores.
#[tracing::instrument]
fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool) {
let _init_guard = zebra_test::init(); let _init_guard = zebra_test::init();
let port = zebra_test::net::random_unallocated_port(); let port = zebra_test::net::random_unallocated_port();
let config = Config { let config = Config {
listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()),
parallel_cpu_threads: if parallel_cpu_threads { 0 } else { 1 },
}; };
let rt = tokio::runtime::Runtime::new().unwrap(); let rt = tokio::runtime::Runtime::new().unwrap();
@ -110,9 +142,6 @@ fn rpc_server_spawn_unallocated_port() {
} }
/// Test if the RPC server will panic correctly when there is a port conflict. /// Test if the RPC server will panic correctly when there is a port conflict.
///
/// TODO: update this test when the number of threads is configurable
/// (when jsonrpc_http_server has multiple threads, it lets any process share its port!)
#[test] #[test]
#[should_panic(expected = "Unable to start RPC server")] #[should_panic(expected = "Unable to start RPC server")]
fn rpc_server_spawn_port_conflict() { fn rpc_server_spawn_port_conflict() {
@ -121,6 +150,7 @@ fn rpc_server_spawn_port_conflict() {
let port = zebra_test::net::random_known_port(); let port = zebra_test::net::random_known_port();
let config = Config { let config = Config {
listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()),
parallel_cpu_threads: 1,
}; };
let rt = tokio::runtime::Runtime::new().unwrap(); let rt = tokio::runtime::Runtime::new().unwrap();
@ -199,3 +229,99 @@ fn rpc_server_spawn_port_conflict() {
}, },
} }
} }
/// Check if the RPC server detects a port conflict when running parallel threads.
///
/// If this test fails, that's great!
/// We can make parallel the default, and remove the warnings in the config docs.
#[test]
fn rpc_server_spawn_port_conflict_parallel_auto() {
let _init_guard = zebra_test::init();
let port = zebra_test::net::random_known_port();
let config = Config {
listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()),
parallel_cpu_threads: 2,
};
let rt = tokio::runtime::Runtime::new().unwrap();
let test_task_handle = rt.spawn(async {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
info!("spawning parallel RPC server 1...");
let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle) = RpcServer::spawn(
config.clone(),
"RPC server 1 test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
NoChainTip,
Mainnet,
);
tokio::time::sleep(Duration::from_secs(3)).await;
info!("spawning parallel conflicted RPC server 2...");
let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle) = RpcServer::spawn(
config,
"RPC server 2 conflict test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
NoChainTip,
Mainnet,
);
info!("spawned RPC servers, checking services...");
mempool.expect_no_requests().await;
state.expect_no_requests().await;
// Because there might be a panic inside a multi-threaded executor,
// we can't depend on the exact behaviour of the other tasks,
// particularly across different machines and OSes.
// The second server doesn't panic, but we'd like it to.
// (See the function docs for details.)
let rpc_server_2_task_result = rpc_server_2_task_handle.await;
match rpc_server_2_task_result {
Ok(()) => info!(
"Parallel RPC server with conflicting port should exit with an error: \
but we're ok with it ignoring the conflict for now"
),
Err(join_error) => match join_error.try_into_panic() {
Ok(panic_object) => panic::resume_unwind(panic_object),
Err(cancelled_error) => info!(
"Parallel RPC server with conflicting port should exit with an error: \
but we're ok with it ignoring the conflict for now: \
unexpected JoinError: {cancelled_error:?}"
),
},
}
// Ignore the queue task result
});
// Wait until the spawned task finishes
std::thread::sleep(Duration::from_secs(10));
info!("waiting for parallel RPC server to shut down...");
rt.shutdown_timeout(Duration::from_secs(3));
match test_task_handle.now_or_never() {
Some(Ok(())) => {
info!("parallel RPC server task successfully exited");
}
None => panic!("unexpected test task hang"),
Some(Err(join_error)) => match join_error.try_into_panic() {
Ok(panic_object) => panic::resume_unwind(panic_object),
Err(cancelled_error) => info!(
"Parallel RPC server with conflicting port should exit with an error: \
but we're ok with it ignoring the conflict for now: \
unexpected JoinError: {cancelled_error:?}"
),
},
}
}

View File

@ -213,7 +213,7 @@ pub struct SyncSection {
/// The number of threads used to verify signatures, proofs, and other CPU-intensive code. /// The number of threads used to verify signatures, proofs, and other CPU-intensive code.
/// ///
/// Set to `0` by default, which uses one thread per available CPU core. /// Set to `0` by default, which uses one thread per available CPU core.
/// For details, see [the rayon documentation](https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads). /// For details, see [the `rayon` documentation](https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads).
pub parallel_cpu_threads: usize, pub parallel_cpu_threads: usize,
} }

View File

@ -1145,8 +1145,25 @@ async fn tracing_endpoint() -> Result<()> {
Ok(()) Ok(())
} }
/// Test that the JSON-RPC endpoint responds to a request,
/// when configured with a single thread.
#[tokio::test] #[tokio::test]
async fn rpc_endpoint() -> Result<()> { async fn rpc_endpoint_single_thread() -> Result<()> {
rpc_endpoint(false).await
}
/// Test that the JSON-RPC endpoint responds to a request,
/// when configured with multiple threads.
#[tokio::test]
async fn rpc_endpoint_parallel_threads() -> Result<()> {
rpc_endpoint(true).await
}
/// Test that the JSON-RPC endpoint responds to a request.
///
/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores.
#[tracing::instrument]
async fn rpc_endpoint(parallel_cpu_threads: bool) -> Result<()> {
use hyper::{body::to_bytes, Body, Client, Method, Request}; use hyper::{body::to_bytes, Body, Client, Method, Request};
use serde_json::Value; use serde_json::Value;
@ -1157,7 +1174,7 @@ async fn rpc_endpoint() -> Result<()> {
// Write a configuration that has RPC listen_addr set // Write a configuration that has RPC listen_addr set
// [Note on port conflict](#Note on port conflict) // [Note on port conflict](#Note on port conflict)
let mut config = random_known_rpc_port_config()?; let mut config = random_known_rpc_port_config(parallel_cpu_threads)?;
let url = format!("http://{}", config.rpc.listen_addr.unwrap()); let url = format!("http://{}", config.rpc.listen_addr.unwrap());
let dir = testdir()?.with_config(&mut config)?; let dir = testdir()?.with_config(&mut config)?;
@ -1648,7 +1665,9 @@ fn zebra_rpc_conflict() -> Result<()> {
// Write a configuration that has RPC listen_addr set // Write a configuration that has RPC listen_addr set
// [Note on port conflict](#Note on port conflict) // [Note on port conflict](#Note on port conflict)
let mut config = random_known_rpc_port_config()?; //
// This is the required setting to detect port conflicts.
let mut config = random_known_rpc_port_config(false)?;
let dir1 = testdir()?.with_config(&mut config)?; let dir1 = testdir()?.with_config(&mut config)?;
let regex1 = regex::escape(&format!( let regex1 = regex::escape(&format!(

View File

@ -215,7 +215,8 @@ pub fn spawn_zebrad_for_rpc_without_initial_peers<P: ZebradTestDirExt + std::fmt
zebra_directory: P, zebra_directory: P,
test_type: LightwalletdTestType, test_type: LightwalletdTestType,
) -> Result<(TestChild<P>, SocketAddr)> { ) -> Result<(TestChild<P>, SocketAddr)> {
let mut config = random_known_rpc_port_config() // This is what we recommend our users configure.
let mut config = random_known_rpc_port_config(true)
.expect("Failed to create a config file with a known RPC listener port"); .expect("Failed to create a config file with a known RPC listener port");
config.state.ephemeral = false; config.state.ephemeral = false;

View File

@ -83,7 +83,9 @@ pub fn zebra_skip_lightwalletd_tests() -> bool {
} }
/// Returns a `zebrad` config with a random known RPC port. /// Returns a `zebrad` config with a random known RPC port.
pub fn random_known_rpc_port_config() -> Result<ZebradConfig> { ///
/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores.
pub fn random_known_rpc_port_config(parallel_cpu_threads: bool) -> Result<ZebradConfig> {
// [Note on port conflict](#Note on port conflict) // [Note on port conflict](#Note on port conflict)
let listen_port = random_known_port(); let listen_port = random_known_port();
let listen_ip = "127.0.0.1".parse().expect("hard-coded IP is valid"); let listen_ip = "127.0.0.1".parse().expect("hard-coded IP is valid");
@ -93,6 +95,13 @@ pub fn random_known_rpc_port_config() -> Result<ZebradConfig> {
// TODO: split this config into another function? // TODO: split this config into another function?
let mut config = default_test_config()?; let mut config = default_test_config()?;
config.rpc.listen_addr = Some(zebra_rpc_listener); config.rpc.listen_addr = Some(zebra_rpc_listener);
if parallel_cpu_threads {
// Auto-configure to the number of CPU cores: most users configre this
config.rpc.parallel_cpu_threads = 0;
} else {
// Default config, users who want to detect port conflicts configure this
config.rpc.parallel_cpu_threads = 1;
}
Ok(config) Ok(config)
} }
@ -306,7 +315,8 @@ impl LightwalletdTestType {
/// and `Some(Err(_))` if the config could not be created. /// and `Some(Err(_))` if the config could not be created.
pub fn zebrad_config(&self, test_name: String) -> Option<Result<ZebradConfig>> { pub fn zebrad_config(&self, test_name: String) -> Option<Result<ZebradConfig>> {
let config = if self.launches_lightwalletd() { let config = if self.launches_lightwalletd() {
random_known_rpc_port_config() // This is what we recommend our users configure.
random_known_rpc_port_config(true)
} else { } else {
default_test_config() default_test_config()
}; };