fix(rpc): Shut down the RPC server properly when Zebra shuts down (#5591)

* Make the queue runner task shut down when the RpcImpl is dropped

* Move RPC server startup into the spawn() tokio future

* Return a shutdown handle from the RPC spawn() method

* Shut down the RPC server properly when Zebra shuts down

* Add a changelog entry for this security fix

* Call RpcServer::shutdown() when it is dropped, and wait

* Block on RPC server shutdown when Zebra's tasks have an error
This commit is contained in:
teor 2022-11-11 00:51:53 +10:00 committed by GitHub
parent a815e9d252
commit 074733d183
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 271 additions and 104 deletions

View File

@ -4,6 +4,24 @@ All notable changes to Zebra are documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org).
## [Zebra 1.0.0-rc.2](https://github.com/ZcashFoundation/zebra/releases/tag/v1.0.0-rc.2) - 2022-11-TODO
Zebra's latest release continues work on mining pool RPCs, and fixes a rare RPC crash that could lead to memory corruption.
Zebra's consensus rules, node sync, and `lightwalletd` RPCs are ready for user testing and experimental use. Zebra has not been audited yet.
### Breaking Changes
This release has the following breaking changes:
- TODO: search the changelog for breaking changes
### Security
- Fix a rare crash and memory errors when Zebra's RPC server shuts down ([#5591](https://github.com/ZcashFoundation/zebra/pull/5591))
TODO: the rest of the changelog
## [Zebra 1.0.0-rc.1](https://github.com/ZcashFoundation/zebra/releases/tag/v1.0.0-rc.1) - 2022-11-02
This is the second Zebra release candidate. Zebra's consensus rules, node sync, and `lightwalletd` RPCs are ready for user testing and experimental use. Zebra has not been audited yet.

View File

@ -14,7 +14,7 @@ use hex::{FromHex, ToHex};
use indexmap::IndexMap;
use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use tokio::{sync::broadcast::Sender, task::JoinHandle};
use tokio::{sync::broadcast, task::JoinHandle};
use tower::{buffer::Buffer, Service, ServiceExt};
use tracing::Instrument;
@ -278,8 +278,8 @@ where
// Tasks
//
/// A sender component of a channel used to send transactions to the queue.
queue_sender: Sender<Option<UnminedTx>>,
/// A sender component of a channel used to send transactions to the mempool queue.
queue_sender: broadcast::Sender<UnminedTx>,
}
impl<Mempool, State, Tip> RpcImpl<Mempool, State, Tip>
@ -313,7 +313,7 @@ where
<Mempool as Service<mempool::Request>>::Future: Send,
<State as Service<zebra_state::ReadRequest>>::Future: Send,
{
let runner = Queue::start();
let (runner, queue_sender) = Queue::start();
let mut app_version = app_version.to_string();
@ -329,7 +329,7 @@ where
mempool: mempool.clone(),
state: state.clone(),
latest_chain_tip: latest_chain_tip.clone(),
queue_sender: runner.sender(),
queue_sender,
};
// run the process queue
@ -517,7 +517,7 @@ where
// send transaction to the rpc queue, ignore any error.
let unmined_transaction = UnminedTx::from(raw_transaction.clone());
let _ = queue_sender.send(Some(unmined_transaction));
let _ = queue_sender.send(unmined_transaction);
let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into());
let request = mempool::Request::Queue(vec![transaction_parameter]);

View File

@ -13,7 +13,7 @@ use std::{collections::HashSet, sync::Arc};
use chrono::Duration;
use indexmap::IndexMap;
use tokio::{
sync::broadcast::{channel, Receiver, Sender},
sync::broadcast::{self, error::TryRecvError},
time::Instant,
};
@ -55,24 +55,26 @@ pub struct Queue {
/// The runner will make the processing of the transactions in the queue.
pub struct Runner {
queue: Queue,
sender: Sender<Option<UnminedTx>>,
receiver: broadcast::Receiver<UnminedTx>,
tip_height: Height,
}
impl Queue {
/// Start a new queue
pub fn start() -> Runner {
let (sender, _receiver) = channel(CHANNEL_AND_QUEUE_CAPACITY);
pub fn start() -> (Runner, broadcast::Sender<UnminedTx>) {
let (sender, receiver) = broadcast::channel(CHANNEL_AND_QUEUE_CAPACITY);
let queue = Queue {
transactions: IndexMap::new(),
};
Runner {
let runner = Runner {
queue,
sender,
receiver,
tip_height: Height(0),
}
};
(runner, sender)
}
/// Get the transactions in the queue.
@ -103,16 +105,6 @@ impl Queue {
}
impl Runner {
/// Create a new sender for this runner.
pub fn sender(&self) -> Sender<Option<UnminedTx>> {
self.sender.clone()
}
/// Create a new receiver.
pub fn receiver(&self) -> Receiver<Option<UnminedTx>> {
self.sender.subscribe()
}
/// Get the queue transactions as a `HashSet` of unmined ids.
fn transactions_as_hash_set(&self) -> HashSet<UnminedTxId> {
let transactions = self.queue.transactions();
@ -157,8 +149,6 @@ impl Runner {
+ 'static,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
let mut receiver = self.sender.subscribe();
loop {
// if we don't have a chain use `NO_CHAIN_TIP_HEIGHT` to get block spacing
let tip_height = match tip.best_tip_height() {
@ -173,8 +163,23 @@ impl Runner {
tokio::time::sleep(spacing.to_std().expect("should never be less than zero")).await;
// get transactions from the channel
while let Ok(Some(tx)) = receiver.try_recv() {
let _ = &self.queue.insert(tx.clone());
loop {
let tx = match self.receiver.try_recv() {
Ok(tx) => tx,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Lagged(skipped_count)) => {
tracing::info!("sendrawtransaction queue was full: skipped {skipped_count} transactions");
continue;
}
Err(TryRecvError::Closed) => {
tracing::info!(
"sendrawtransaction queue was closed: is Zebra shutting down?"
);
return;
}
};
self.queue.insert(tx.clone());
}
// skip some work if stored tip height is the same as the one arriving

View File

@ -34,7 +34,7 @@ proptest! {
#[test]
fn insert_remove_to_from_queue(transaction in any::<UnminedTx>()) {
// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();
// insert transaction
runner.queue.insert(transaction.clone());
@ -54,7 +54,7 @@ proptest! {
#[test]
fn queue_size_limit(transactions in any::<[UnminedTx; CHANNEL_AND_QUEUE_CAPACITY + 1]>()) {
// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();
// insert all transactions we have
transactions.iter().for_each(|t| runner.queue.insert(t.clone()));
@ -68,7 +68,7 @@ proptest! {
#[test]
fn queue_order(transactions in any::<[UnminedTx; 32]>()) {
// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();
// fill the queue and check insertion order
for i in 0..CHANNEL_AND_QUEUE_CAPACITY {
let transaction = transactions[i].clone();
@ -108,7 +108,7 @@ proptest! {
time::pause();
// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();
// insert a transaction to the queue
runner.queue.insert(transaction);
@ -165,7 +165,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();
// insert a transaction to the queue
let unmined_transaction = UnminedTx::from(transaction);
@ -246,7 +246,7 @@ proptest! {
let mut write_state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();
// insert a transaction to the queue
let unmined_transaction = UnminedTx::from(&transaction);
@ -320,7 +320,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
// create a queue
let mut runner = Queue::start();
let (mut runner, _sender) = Queue::start();
// insert a transaction to the queue
let unmined_transaction = UnminedTx::from(transaction.clone());

View File

@ -7,15 +7,14 @@
//! See the full list of
//! [Differences between JSON-RPC 1.0 and 2.0.](https://www.simple-is-better.org/rpc/#differences-between-1-0-and-2-0)
use std::{panic, sync::Arc};
use std::{fmt, panic, sync::Arc};
use jsonrpc_core::{Compatibility, MetaIoHandler};
use jsonrpc_http_server::ServerBuilder;
use jsonrpc_http_server::{CloseHandle, ServerBuilder};
use tokio::task::JoinHandle;
use tower::{buffer::Buffer, Service};
use tracing::*;
use tracing_futures::Instrument;
use tracing::{Instrument, *};
use zebra_chain::{
block::{self, Block},
@ -40,11 +39,35 @@ mod tracing_middleware;
mod tests;
/// Zebra RPC Server
#[derive(Clone, Debug)]
pub struct RpcServer;
#[derive(Clone)]
pub struct RpcServer {
config: Config,
network: Network,
app_version: String,
close_handle: CloseHandle,
}
impl fmt::Debug for RpcServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RpcServer")
.field("config", &self.config)
.field("network", &self.network)
.field("app_version", &self.app_version)
.field(
"close_handle",
// TODO: when it stabilises, use std::any::type_name_of_val(&self.close_handle)
&"CloseHandle",
)
.finish()
}
}
impl RpcServer {
/// Start a new RPC server endpoint.
/// Start a new RPC server endpoint using the supplied configs and services.
/// `app_version` is a version string for the application, which is used in RPC responses.
///
/// Returns [`JoinHandle`]s for the RPC server and `sendrawtransaction` queue tasks,
/// and a [`RpcServer`] handle, which can be used to shut down the RPC server task.
//
// TODO: put some of the configs or services in their own struct?
#[allow(clippy::too_many_arguments)]
@ -62,9 +85,9 @@ impl RpcServer {
chain_verifier: ChainVerifier,
latest_chain_tip: Tip,
network: Network,
) -> (JoinHandle<()>, JoinHandle<()>)
) -> (JoinHandle<()>, JoinHandle<()>, Option<Self>)
where
Version: ToString + Clone,
Version: ToString + Clone + Send + 'static,
Mempool: tower::Service<
mempool::Request,
Response = mempool::Response,
@ -112,7 +135,7 @@ impl RpcServer {
// Initialize the rpc methods with the zebra version
let (rpc_impl, rpc_tx_queue_task_handle) = RpcImpl::new(
app_version,
app_version.clone(),
network,
config.debug_force_finished_sync,
mempool,
@ -129,18 +152,14 @@ impl RpcServer {
}
// The server is a blocking task, which blocks on executor shutdown.
// So we need to create and spawn it on a std::thread, inside a tokio blocking task.
// (Otherwise tokio panics when we shut down the RPC server.)
// So we need to start it in a std::thread.
// (Otherwise tokio panics on RPC port conflict, which shuts down the RPC server.)
let span = Span::current();
let server = move || {
let start_server = move || {
span.in_scope(|| {
// Use a different tokio executor from the rest of Zebra,
// so that large RPCs and any task handling bugs don't impact Zebra.
//
// TODO:
// - return server.close_handle(), which can shut down the RPC server,
// and add it to the server tests
let server = ServerBuilder::new(io)
let server_instance = ServerBuilder::new(io)
.threads(parallel_cpu_threads)
// TODO: disable this security check if we see errors from lightwalletd
//.allowed_hosts(DomainsValidation::Disabled)
@ -148,32 +167,117 @@ impl RpcServer {
.start_http(&listen_addr)
.expect("Unable to start RPC server");
info!("Opened RPC endpoint at {}", server.address());
info!("Opened RPC endpoint at {}", server_instance.address());
server.wait();
let close_handle = server_instance.close_handle();
info!("Stopping RPC endpoint");
let rpc_server_handle = RpcServer {
config,
network,
app_version: app_version.to_string(),
close_handle,
};
(server_instance, rpc_server_handle)
})
};
(
tokio::task::spawn_blocking(|| {
let thread_handle = std::thread::spawn(server);
// Propagate panics from the std::thread
let (server_instance, rpc_server_handle) = match std::thread::spawn(start_server).join()
{
Ok(rpc_server) => rpc_server,
Err(panic_object) => panic::resume_unwind(panic_object),
};
// Propagate panics from the inner std::thread to the outer tokio blocking task
match thread_handle.join() {
Ok(()) => (),
Err(panic_object) => panic::resume_unwind(panic_object),
}
}),
// The server is a blocking task, which blocks on executor shutdown.
// So we need to wait on it on a std::thread, inside a tokio blocking task.
// (Otherwise tokio panics when we shut down the RPC server.)
let span = Span::current();
let wait_on_server = move || {
span.in_scope(|| {
server_instance.wait();
info!("Stopped RPC endpoint");
})
};
let span = Span::current();
let rpc_server_task_handle = tokio::task::spawn_blocking(move || {
let thread_handle = std::thread::spawn(wait_on_server);
// Propagate panics from the inner std::thread to the outer tokio blocking task
span.in_scope(|| match thread_handle.join() {
Ok(()) => (),
Err(panic_object) => panic::resume_unwind(panic_object),
})
});
(
rpc_server_task_handle,
rpc_tx_queue_task_handle,
Some(rpc_server_handle),
)
} else {
// There is no RPC port, so the RPC tasks do nothing.
(
tokio::task::spawn(futures::future::pending().in_current_span()),
tokio::task::spawn(futures::future::pending().in_current_span()),
None,
)
}
}
/// Shut down this RPC server, blocking the current thread.
///
/// This method can be called from within a tokio executor without panicking.
/// But it is blocking, so `shutdown()` should be used instead.
pub fn shutdown_blocking(&self) {
Self::shutdown_blocking_inner(self.close_handle.clone())
}
/// Shut down this RPC server asynchronously.
/// Returns a task that completes when the server is shut down.
pub fn shutdown(&self) -> JoinHandle<()> {
let close_handle = self.close_handle.clone();
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(|| Self::shutdown_blocking_inner(close_handle))
})
}
/// Shuts down this RPC server using its `close_handle`.
///
/// See `shutdown_blocking()` for details.
fn shutdown_blocking_inner(close_handle: CloseHandle) {
// The server is a blocking task, so it can't run inside a tokio thread.
// See the note at wait_on_server.
let span = Span::current();
let wait_on_shutdown = move || {
span.in_scope(|| {
info!("Stopping RPC server");
close_handle.clone().close();
info!("Stopped RPC server");
})
};
let span = Span::current();
let thread_handle = std::thread::spawn(wait_on_shutdown);
// Propagate panics from the inner std::thread to the outer tokio blocking task
span.in_scope(|| match thread_handle.join() {
Ok(()) => (),
Err(panic_object) => panic::resume_unwind(panic_object),
})
}
}
impl Drop for RpcServer {
fn drop(&mut self) {
// Block on shutting down, propagating panics.
// This can take around 150 seconds.
//
// Without this shutdown, Zebra's RPC unit tests sometimes crashed with memory errors.
self.shutdown_blocking();
}
}

View File

@ -51,7 +51,7 @@ fn rpc_server_spawn(parallel_cpu_threads: bool) {
info!("spawning RPC server...");
let (rpc_server_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn(
let (rpc_server_task_handle, rpc_tx_queue_task_handle, _rpc_server) = RpcServer::spawn(
config,
Default::default(),
"RPC server test",
@ -74,9 +74,6 @@ fn rpc_server_spawn(parallel_cpu_threads: bool) {
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
// TODO: when we return server.close_handle(), use it to shut down the server here,
// and remove the shutdown timeout
});
info!("waiting for RPC server to shut down...");
@ -87,21 +84,36 @@ fn rpc_server_spawn(parallel_cpu_threads: bool) {
/// on an OS-assigned unallocated port.
#[test]
fn rpc_server_spawn_unallocated_port_single_thread() {
rpc_server_spawn_unallocated_port(false)
rpc_server_spawn_unallocated_port(false, false)
}
/// Test that the JSON-RPC server spawn when configured with multiple threads,
/// Test that the JSON-RPC server spawns and shuts down when configured with a single thread,
/// on an OS-assigned unallocated port.
#[test]
fn rpc_server_spawn_unallocated_port_single_thread_shutdown() {
rpc_server_spawn_unallocated_port(false, true)
}
/// Test that the JSON-RPC server spawns when configured with multiple threads,
/// on an OS-assigned unallocated port.
#[test]
fn rpc_sever_spawn_unallocated_port_parallel_threads() {
rpc_server_spawn_unallocated_port(true)
rpc_server_spawn_unallocated_port(true, false)
}
/// Test that the JSON-RPC server spawns and shuts down when configured with multiple threads,
/// on an OS-assigned unallocated port.
#[test]
fn rpc_sever_spawn_unallocated_port_parallel_threads_shutdown() {
rpc_server_spawn_unallocated_port(true, true)
}
/// Test if the RPC server will spawn on an OS-assigned unallocated port.
///
/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores.
/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores,
/// and `do_shutdown` to true to close the server using the close handle.
#[tracing::instrument]
fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool) {
fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool, do_shutdown: bool) {
let _init_guard = zebra_test::init();
let port = zebra_test::net::random_unallocated_port();
@ -123,7 +135,7 @@ fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool) {
info!("spawning RPC server...");
let (rpc_server_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn(
let (rpc_server_task_handle, rpc_tx_queue_task_handle, rpc_server) = RpcServer::spawn(
config,
Default::default(),
"RPC server test",
@ -140,15 +152,33 @@ fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool) {
state.expect_no_requests().await;
chain_verifier.expect_no_requests().await;
// The server and queue tasks should continue without errors or panics
let rpc_server_task_result = rpc_server_task_handle.now_or_never();
assert!(matches!(rpc_server_task_result, None));
if do_shutdown {
rpc_server
.expect("unexpected missing RpcServer for configured RPC port")
.shutdown()
.await
.expect("unexpected panic during RpcServer shutdown");
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
// The server and queue tasks should shut down without errors or panics
let rpc_server_task_result = rpc_server_task_handle.await;
assert!(
matches!(rpc_server_task_result, Ok(())),
"unexpected server task panic during shutdown: {rpc_server_task_result:?}"
);
// TODO: when we return server.close_handle(), use it to shut down the server here
// and remove the shutdown timeout
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.await;
assert!(
matches!(rpc_tx_queue_task_result, Ok(())),
"unexpected queue task panic during shutdown: {rpc_tx_queue_task_result:?}"
);
} else {
// The server and queue tasks should continue without errors or panics
let rpc_server_task_result = rpc_server_task_handle.now_or_never();
assert!(matches!(rpc_server_task_result, None));
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
}
});
info!("waiting for RPC server to shut down...");
@ -182,22 +212,23 @@ fn rpc_server_spawn_port_conflict() {
info!("spawning RPC server 1...");
let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle) = RpcServer::spawn(
config.clone(),
Default::default(),
"RPC server 1 test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(chain_verifier.clone(), 1),
NoChainTip,
Mainnet,
);
let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle, _rpc_server) =
RpcServer::spawn(
config.clone(),
Default::default(),
"RPC server 1 test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(chain_verifier.clone(), 1),
NoChainTip,
Mainnet,
);
tokio::time::sleep(Duration::from_secs(3)).await;
info!("spawning conflicted RPC server 2...");
let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle) = RpcServer::spawn(
let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle, _rpc_server) = RpcServer::spawn(
config,
Default::default(),
"RPC server 2 conflict test",
@ -285,22 +316,23 @@ fn rpc_server_spawn_port_conflict_parallel_auto() {
info!("spawning parallel RPC server 1...");
let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle) = RpcServer::spawn(
config.clone(),
Default::default(),
"RPC server 1 test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(chain_verifier.clone(), 1),
NoChainTip,
Mainnet,
);
let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle, _rpc_server) =
RpcServer::spawn(
config.clone(),
Default::default(),
"RPC server 1 test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(chain_verifier.clone(), 1),
NoChainTip,
Mainnet,
);
tokio::time::sleep(Duration::from_secs(3)).await;
info!("spawning parallel conflicted RPC server 2...");
let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle) = RpcServer::spawn(
let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle, _rpc_server) = RpcServer::spawn(
config,
Default::default(),
"RPC server 2 conflict test",

View File

@ -176,7 +176,7 @@ impl StartCmd {
.service(mempool);
// Launch RPC server
let (rpc_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn(
let (rpc_task_handle, rpc_tx_queue_task_handle, rpc_server) = RpcServer::spawn(
config.rpc,
#[cfg(feature = "getblocktemplate-rpcs")]
config.mining,
@ -356,6 +356,14 @@ impl StartCmd {
groth16_download_handle.abort();
old_databases_task_handle.abort();
// Wait until the RPC server shuts down.
// This can take around 150 seconds.
//
// Without this shutdown, Zebra's RPC unit tests sometimes crashed with memory errors.
if let Some(rpc_server) = rpc_server {
rpc_server.shutdown_blocking();
}
exit_status
}