Fix a deadlock in TokioComponent.

The components are accessed by a lock on application state.  When some command
calls block_on to enter an async context, it obtained a write lock on the
entire application state.  This meant that if the application state were
accessed later in an async context, a deadlock would occur.  Instead the
TokioComponent holds an Option<Runtime> now, so that before calling block_on,
the caller can .take() the runtime and release the lock.  Since we only ever
enter an async context once, it's not a problem that the component is then
missing its runtime, as once we are inside of a task we can access the runtime.
This commit is contained in:
Henry de Valence 2020-01-13 09:54:27 -08:00
parent ab3db201ee
commit 4fcb550aa6
7 changed files with 49 additions and 35 deletions

View File

@ -3,7 +3,7 @@
use crate::{commands::ZebradCmd, config::ZebradConfig}; use crate::{commands::ZebradCmd, config::ZebradConfig};
use abscissa_core::{ use abscissa_core::{
application::{self, AppCell}, application::{self, AppCell},
config, trace, Application, Component, EntryPoint, FrameworkError, StandardPaths, config, trace, Application, EntryPoint, FrameworkError, StandardPaths,
}; };
/// Application state /// Application state

View File

@ -26,13 +26,18 @@ impl Runnable for ConnectCmd {
info!(connect.addr = ?self.addr); info!(connect.addr = ?self.addr);
use crate::components::tokio::TokioComponent; use crate::components::tokio::TokioComponent;
let _ = app_writer() let rt = app_writer()
.state_mut() .state_mut()
.components .components
.get_downcast_mut::<TokioComponent>() .get_downcast_mut::<TokioComponent>()
.expect("TokioComponent should be available") .expect("TokioComponent should be available")
.rt .rt
.block_on(self.connect()); .take();
rt.expect("runtime should not already be taken")
.block_on(self.connect())
// Surface any error that occurred executing the future.
.unwrap();
} }
} }
@ -44,11 +49,9 @@ impl ConnectCmd {
use tower::{buffer::Buffer, service_fn, Service, ServiceExt}; use tower::{buffer::Buffer, service_fn, Service, ServiceExt};
let node = Buffer::new( let node = Buffer::new(
service_fn(|req| { service_fn(|req| async move {
async move { info!(?req);
info!(?req); Ok::<Response, Error>(Response::Ok)
Ok::<Response, Error>(Response::Ok)
}
}), }),
1, 1,
); );

View File

@ -112,13 +112,18 @@ impl Runnable for SeedCmd {
fn run(&self) { fn run(&self) {
use crate::components::tokio::TokioComponent; use crate::components::tokio::TokioComponent;
let _ = app_writer() let rt = app_writer()
.state_mut() .state_mut()
.components .components
.get_downcast_mut::<TokioComponent>() .get_downcast_mut::<TokioComponent>()
.expect("TokioComponent should be available") .expect("TokioComponent should be available")
.rt .rt
.block_on(self.seed()); .take();
rt.expect("runtime should not already be taken")
.block_on(self.seed())
// Surface any error that occurred executing the future.
.unwrap();
} }
} }

View File

@ -40,12 +40,15 @@ impl Runnable for StartCmd {
use crate::components::tokio::TokioComponent; use crate::components::tokio::TokioComponent;
app_writer() let rt = app_writer()
.state_mut() .state_mut()
.components .components
.get_downcast_mut::<TokioComponent>() .get_downcast_mut::<TokioComponent>()
.expect("TokioComponent should be available") .expect("TokioComponent should be available")
.rt .rt
.take();
rt.expect("runtime should not already be taken")
.block_on(future::pending::<()>()); .block_on(future::pending::<()>());
} }
} }

View File

@ -5,15 +5,21 @@ use abscissa_core::{Component, FrameworkError};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
/// An Abscissa component which owns a Tokio runtime. /// An Abscissa component which owns a Tokio runtime.
///
/// The runtime is stored as an `Option` so that when it's time to enter an async
/// context by calling `block_on` with a "root future", the runtime can be taken
/// independently of Abscissa's component locking system. Otherwise whatever
/// calls `block_on` holds an application lock for the entire lifetime of the
/// async context.
#[derive(Component, Debug)] #[derive(Component, Debug)]
pub struct TokioComponent { pub struct TokioComponent {
pub rt: Runtime, pub rt: Option<Runtime>,
} }
impl TokioComponent { impl TokioComponent {
pub fn new() -> Result<Self, FrameworkError> { pub fn new() -> Result<Self, FrameworkError> {
Ok(Self { Ok(Self {
rt: Runtime::new().unwrap(), rt: Some(Runtime::new().unwrap()),
}) })
} }
} }

View File

@ -47,23 +47,27 @@ impl TracingEndpoint {
.parse() .parse()
.expect("Hardcoded address should be parseable"); .expect("Hardcoded address should be parseable");
tokio_component.rt.spawn(async move { tokio_component
// try_bind uses the tokio runtime, so we .rt
// need to construct it inside the task. .as_ref()
let server = match Server::try_bind(&addr) { .expect("runtime should not be taken")
Ok(s) => s, .spawn(async move {
Err(e) => { // try_bind uses the tokio runtime, so we
error!("Could not open tracing endpoint listener"); // need to construct it inside the task.
error!("Error: {}", e); let server = match Server::try_bind(&addr) {
return; Ok(s) => s,
Err(e) => {
error!("Could not open tracing endpoint listener");
error!("Error: {}", e);
return;
}
} }
} .serve(service);
.serve(service);
if let Err(e) = server.await { if let Err(e) = server.await {
error!("Server error: {}", e); error!("Server error: {}", e);
} }
}); });
Ok(()) Ok(())
} }

View File

@ -6,10 +6,3 @@ pub use crate::application::{app_config, app_reader, app_writer};
/// Commonly used Abscissa traits /// Commonly used Abscissa traits
pub use abscissa_core::{Application, Command, Runnable}; pub use abscissa_core::{Application, Command, Runnable};
/// Type alias to make working with tower traits easier.
///
/// Note: the 'static lifetime bound means that the *type* cannot have any
/// non-'static lifetimes, (e.g., when a type contains a borrow and is
/// parameterized by 'a), *not* that the object itself has 'static lifetime.
pub(crate) use abscissa_core::error::BoxError;