change(zebrad): opens the database in a blocking tokio thread, which allows tokio to run other tasks (#5228)

* use spawn_blocking to run zebra_state::init from start cmd

* uses zebra_state::spawn_init in copy-state command

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
Arya 2022-09-26 11:45:42 -04:00 committed by GitHub
parent 9cb6c559f4
commit ec115e930f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 4 deletions

View File

@ -34,7 +34,7 @@ pub use request::{FinalizedBlock, HashOrHeight, PreparedBlock, ReadRequest, Requ
pub use response::{ReadResponse, Response};
pub use service::{
chain_tip::{ChainTipChange, LatestChainTip, TipAction},
init, OutputIndex, OutputLocation, TransactionLocation,
init, spawn_init, OutputIndex, OutputLocation, TransactionLocation,
};
#[cfg(any(test, feature = "proptest-impl"))]

View File

@ -1292,6 +1292,21 @@ pub fn init(
)
}
/// Calls [`init`] with the provided [`Config`] and [`Network`] from a blocking task.
/// Returns a [`tokio::task::JoinHandle`] with a boxed state service,
/// a read state service, and receivers for state chain tip updates.
pub fn spawn_init(
config: Config,
network: Network,
) -> tokio::task::JoinHandle<(
BoxService<Request, Response, BoxError>,
ReadStateService,
LatestChainTip,
ChainTipChange,
)> {
tokio::task::spawn_blocking(move || init(config, network))
}
/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
///
/// This can be used to create a state service for testing.

View File

@ -117,7 +117,7 @@ impl CopyStateCmd {
_source_read_only_state_service,
_source_latest_chain_tip,
_source_chain_tip_change,
) = old_zs::init(source_config.clone(), network);
) = old_zs::spawn_init(source_config.clone(), network).await?;
let elapsed = source_start_time.elapsed();
info!(?elapsed, "finished initializing source state service");
@ -136,7 +136,7 @@ impl CopyStateCmd {
_target_read_only_state_service,
_target_latest_chain_tip,
_target_chain_tip_change,
) = new_zs::init(target_config.clone(), network);
) = new_zs::spawn_init(target_config.clone(), network).await?;
let elapsed = target_start_time.elapsed();
info!(?elapsed, "finished initializing target state service");

View File

@ -103,8 +103,11 @@ impl StartCmd {
info!(?config);
info!("initializing node state");
info!("opening database, this may take a couple minutes");
let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
zebra_state::init(config.state.clone(), config.network.network);
zebra_state::spawn_init(config.state.clone(), config.network.network).await?;
let state = ServiceBuilder::new()
.buffer(Self::state_buffer_bound())
.service(state_service);

View File

@ -313,6 +313,31 @@ fn start_args() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn db_init_outside_future_executor() -> Result<()> {
use std::time::{Duration, Instant};
let _init_guard = zebra_test::init();
let config = default_test_config()?;
let start = Instant::now();
let db_init_handle = zebra_state::spawn_init(config.state.clone(), config.network.network);
// it's faster to panic if it takes longer than expected, since the executor
// will wait indefinitely for blocking operation to finish once started
let block_duration = start.elapsed();
assert!(
block_duration < Duration::from_millis(5),
"futures executor was blocked longer than expected ({:?})",
block_duration,
);
db_init_handle.await?;
Ok(())
}
#[test]
fn persistent_mode() -> Result<()> {
let _init_guard = zebra_test::init();