buffer: fix panics if worker executor drops unexpectedly (#190)

This commit is contained in:
Sean McArthur 2019-03-08 18:18:01 -08:00 committed by GitHub
parent ac9488f5f2
commit 20102a647b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 94 additions and 38 deletions

View File

@ -6,10 +6,15 @@ use std::sync::Arc;
/// An error produced by a `Service` wrapped by a `Buffer` /// An error produced by a `Service` wrapped by a `Buffer`
#[derive(Debug)] #[derive(Debug)]
pub struct ServiceError { pub struct ServiceError {
method: &'static str,
inner: Arc<Error>, inner: Arc<Error>,
} }
/// An error when the buffer's worker closes unexpectedly.
#[derive(Debug)]
pub struct Closed {
_p: (),
}
/// Error produced when spawning the worker fails /// Error produced when spawning the worker fails
#[derive(Debug)] #[derive(Debug)]
pub struct SpawnError<T> { pub struct SpawnError<T> {
@ -22,15 +27,14 @@ pub(crate) type Error = Box<::std::error::Error + Send + Sync>;
// ===== impl ServiceError ===== // ===== impl ServiceError =====
impl ServiceError { impl ServiceError {
pub(crate) fn new(method: &'static str, inner: Error) -> ServiceError { pub(crate) fn new(inner: Error) -> ServiceError {
let inner = Arc::new(inner); let inner = Arc::new(inner);
ServiceError { method, inner } ServiceError { inner }
} }
/// Private to avoid exposing `Clone` trait as part of the public API /// Private to avoid exposing `Clone` trait as part of the public API
pub(crate) fn clone(&self) -> ServiceError { pub(crate) fn clone(&self) -> ServiceError {
ServiceError { ServiceError {
method: self.method,
inner: self.inner.clone(), inner: self.inner.clone(),
} }
} }
@ -38,7 +42,7 @@ impl ServiceError {
impl fmt::Display for ServiceError { impl fmt::Display for ServiceError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Service::{} failed: {}", self.method, self.inner) write!(fmt, "buffered service failed: {}", self.inner)
} }
} }
@ -48,6 +52,22 @@ impl std::error::Error for ServiceError {
} }
} }
// ===== impl Closed =====
impl Closed {
pub(crate) fn new() -> Self {
Closed { _p: () }
}
}
impl fmt::Display for Closed {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str("buffer's worker closed unexpectedly")
}
}
impl std::error::Error for Closed {}
// ===== impl SpawnError ===== // ===== impl SpawnError =====
impl<T> SpawnError<T> { impl<T> SpawnError<T> {

View File

@ -1,6 +1,6 @@
//! Future types //! Future types
use error::{Error, ServiceError}; use error::{Closed, Error};
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use message; use message;
@ -10,7 +10,7 @@ pub struct ResponseFuture<T> {
} }
enum ResponseState<T> { enum ResponseState<T> {
Failed(ServiceError), Failed(Option<Error>),
Rx(message::Rx<T>), Rx(message::Rx<T>),
Poll(T), Poll(T),
} }
@ -26,9 +26,9 @@ where
} }
} }
pub(crate) fn failed(err: ServiceError) -> Self { pub(crate) fn failed(err: Error) -> Self {
ResponseFuture { ResponseFuture {
state: ResponseState::Failed(err), state: ResponseState::Failed(Some(err)),
} }
} }
} }
@ -48,16 +48,14 @@ where
let fut; let fut;
match self.state { match self.state {
Failed(ref e) => { Failed(ref mut e) => {
return Err(e.clone().into()); return Err(e.take().expect("polled after error"));
} }
Rx(ref mut rx) => match rx.poll() { Rx(ref mut rx) => match rx.poll() {
Ok(Async::Ready(Ok(f))) => fut = f, Ok(Async::Ready(Ok(f))) => fut = f,
Ok(Async::Ready(Err(e))) => return Err(e.into()), Ok(Async::Ready(Err(e))) => return Err(e.into()),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => unreachable!( Err(_) => return Err(Closed::new().into()),
"Worker exited without sending error to all outstanding requests."
),
}, },
Poll(ref mut fut) => { Poll(ref mut fut) => {
return fut.poll().map_err(Into::into); return fut.poll().map_err(Into::into);

View File

@ -140,7 +140,7 @@ where
// If the inner service has errored, then we error here. // If the inner service has errored, then we error here.
self.tx self.tx
.poll_ready() .poll_ready()
.map_err(move |_| self.worker.get_error_on_closed().into()) .map_err(|_| self.worker.get_error_on_closed())
} }
fn call(&mut self, request: Request) -> Self::Future { fn call(&mut self, request: Request) -> Self::Future {

View File

@ -1,4 +1,4 @@
use error::{Error, ServiceError}; use error::{Closed, Error, ServiceError};
use futures::future::Executor; use futures::future::Executor;
use futures::{Async, Future, Poll, Stream}; use futures::{Async, Future, Poll, Stream};
use message::Message; use message::Message;
@ -106,8 +106,8 @@ where
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} }
fn failed(&mut self, method: &'static str, error: T::Error) { fn failed(&mut self, error: T::Error) {
// The underlying service failed when we called `method` on it with the given `error`. We // The underlying service failed when we called `poll_ready` on it with the given `error`. We
// need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in // need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in
// an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent // an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent
// requests will also fail with the same error. // requests will also fail with the same error.
@ -119,7 +119,7 @@ where
// request. We do this by *first* exposing the error, *then* closing the channel used to // request. We do this by *first* exposing the error, *then* closing the channel used to
// send more requests (so the client will see the error when the send fails), and *then* // send more requests (so the client will see the error when the send fails), and *then*
// sending the error to all outstanding requests. // sending the error to all outstanding requests.
let error = ServiceError::new(method, error.into()); let error = ServiceError::new(error.into());
let mut inner = self.handle.inner.lock().unwrap(); let mut inner = self.handle.inner.lock().unwrap();
@ -154,8 +154,8 @@ where
} }
loop { loop {
match self.poll_next_msg()? { match try_ready!(self.poll_next_msg()) {
Async::Ready(Some(msg)) => { Some(msg) => {
if let Some(ref failed) = self.failed { if let Some(ref failed) = self.failed {
let _ = msg.tx.send(Err(failed.clone())); let _ = msg.tx.send(Err(failed.clone()));
continue; continue;
@ -171,7 +171,6 @@ where
// An error means the request had been canceled in-between // An error means the request had been canceled in-between
// our calls, the response future will just be dropped. // our calls, the response future will just be dropped.
let _ = msg.tx.send(Ok(response)); let _ = msg.tx.send(Ok(response));
continue;
} }
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
// Put out current message back in its slot. // Put out current message back in its slot.
@ -179,7 +178,7 @@ where
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
Err(e) => { Err(e) => {
self.failed("poll_ready", e); self.failed(e);
let _ = msg.tx.send(Err(self let _ = msg.tx.send(Err(self
.failed .failed
.as_ref() .as_ref()
@ -188,33 +187,24 @@ where
} }
} }
} }
Async::Ready(None) => { None => {
// No more more requests _ever_. // No more more requests _ever_.
self.finish = true; self.finish = true;
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} }
Async::NotReady if self.failed.is_some() => {
// No need to poll the service as it has already failed.
return Ok(Async::NotReady);
}
Async::NotReady => {
// We don't have any new requests to enqueue.
// So we yield.
return Ok(Async::NotReady);
}
} }
} }
} }
} }
impl Handle { impl Handle {
pub(crate) fn get_error_on_closed(&self) -> ServiceError { pub(crate) fn get_error_on_closed(&self) -> Error {
self.inner self.inner
.lock() .lock()
.unwrap() .unwrap()
.as_ref() .as_ref()
.expect("Worker exited, but did not set error.") .map(|svc_err| svc_err.clone().into())
.clone() .unwrap_or_else(|| Closed::new().into())
} }
} }

View File

@ -7,6 +7,7 @@ use futures::prelude::*;
use tower_buffer::*; use tower_buffer::*;
use tower_service::*; use tower_service::*;
use std::cell::RefCell;
use std::thread; use std::thread;
#[test] #[test]
@ -98,8 +99,6 @@ fn when_inner_fails() {
with_task(|| { with_task(|| {
let e = res1.poll().unwrap_err(); let e = res1.poll().unwrap_err();
if let Some(e) = e.downcast_ref::<error::ServiceError>() { if let Some(e) = e.downcast_ref::<error::ServiceError>() {
assert!(format!("{}", e).contains("poll_ready"));
let e = e.source().unwrap(); let e = e.source().unwrap();
assert_eq!(e.to_string(), "foobar"); assert_eq!(e.to_string(), "foobar");
@ -109,6 +108,42 @@ fn when_inner_fails() {
}); });
} }
#[test]
fn poll_ready_when_worker_is_dropped_early() {
let (service, _handle) = Mock::new();
// drop that worker right on the floor!
let exec = ExecFn(drop);
let mut service = Buffer::with_executor(service, 1, &exec).unwrap();
with_task(|| {
service
.poll_ready()
.expect_err("buffer poll_ready should error");
});
}
#[test]
fn response_future_when_worker_is_dropped_early() {
let (service, mut handle) = Mock::new();
// hold the worker in a cell until we want to drop it later
let cell = RefCell::new(None);
let exec = ExecFn(|fut| *cell.borrow_mut() = Some(fut));
let mut service = Buffer::with_executor(service, 1, &exec).unwrap();
// keep the request in the worker
handle.allow(0);
let response = service.call("hello");
// drop the worker (like an executor closing up)
cell.borrow_mut().take();
response.wait().expect_err("res.wait");
}
type Mock = tower_mock::Mock<&'static str, &'static str>; type Mock = tower_mock::Mock<&'static str, &'static str>;
type Handle = tower_mock::Handle<&'static str, &'static str>; type Handle = tower_mock::Handle<&'static str, &'static str>;
@ -126,6 +161,19 @@ where
} }
} }
struct ExecFn<Func>(Func);
impl<Func, F> futures::future::Executor<F> for ExecFn<Func>
where
Func: Fn(F),
F: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, fut: F) -> Result<(), futures::future::ExecuteError<F>> {
(self.0)(fut);
Ok(())
}
}
fn new_service() -> (Buffer<Mock, &'static str>, Handle) { fn new_service() -> (Buffer<Mock, &'static str>, Handle) {
let (service, handle) = Mock::new(); let (service, handle) = Mock::new();
// bound is >0 here because clears_canceled_requests needs multiple outstanding requests // bound is >0 here because clears_canceled_requests needs multiple outstanding requests