Expose Service errors through tower-buffer (#143)

In the past, any errors thrown by a `Service` wrapped in a
`tower_buffer::Buffer` were silently swallowed, and the handles were
simply informed that the connection to the `Service` was closed.

This patch captures errors from a wrapped `Service`, and communicates
that error to all pending and future requests. It does so by wrapping up
the error in an `Arc`, which is sent to all pending `oneshot` request
channels, and is stored in a shared location so that future requests
will see the error when their send to the `Worker` fail.

Note that this patch also removes the `open` field from `State`, as it
is no longer necessary following #120, since bounded channels have a
`try_ready` method we can rely on instead.

Note that this change is not entirely backwards compatible -- the error
type for a `Service` that is wrapped in `Buffer` must now be `Send +
Sync` so that it can safely be communicated back to callers.
Furthermore, `tower_buffer::Error::Closed` now contains the error that
the failed `Service` produced, which may trip up old code.
This commit is contained in:
Jon Gjengset 2019-01-16 11:31:42 -05:00 committed by GitHub
parent 9f422d29b1
commit d42f48bbbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 204 additions and 94 deletions

View File

@ -96,8 +96,9 @@ where
D: Discover + Send + 'static,
D::Key: Send,
D::Service: Service<Req, Response = Rsp> + Send,
D::Error: Send,
D::Error: Send + Sync,
<D::Service as Service<Req>>::Future: Send,
<D::Service as Service<Req>>::Error: Send + Sync,
C: lb::Choose<D::Key, D::Service> + Send + 'static,
{
println!("{}", name);
@ -243,8 +244,9 @@ where
D::Service: Service<Req>,
D::Key: Send,
D::Service: Send,
D::Error: Send,
D::Error: Send + Sync,
<D::Service as Service<Req>>::Future: Send,
<D::Service as Service<Req>>::Error: Send + Sync,
C: lb::Choose<D::Key, D::Service> + Send + 'static,
{
pub fn new(lb: lb::Balance<D, C>, total: usize, concurrency: usize) -> Self {

View File

@ -9,6 +9,7 @@ futures = "0.1"
tower-service = { version = "0.2", path = "../tower-service" }
tower-direct-service = { version = "0.1", path = "../tower-direct-service" }
tokio-executor = "0.1"
lazycell = "1.2"
[dev-dependencies]
tower-mock = { version = "0.1", path = "../tower-mock" }

View File

@ -7,22 +7,21 @@
#[macro_use]
extern crate futures;
extern crate tower_service;
extern crate lazycell;
extern crate tokio_executor;
extern crate tower_direct_service;
extern crate tower_service;
use futures::future::Executor;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::{Async, Future, Poll, Stream};
use std::marker::PhantomData;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::{error, fmt};
use tower_service::Service;
use tokio_executor::DefaultExecutor;
use tower_direct_service::DirectService;
use tower_service::Service;
/// Adds a buffer in front of an inner service.
///
@ -31,25 +30,41 @@ pub struct Buffer<T, Request>
where
T: Service<Request>,
{
tx: mpsc::Sender<Message<Request, T::Future>>,
state: Arc<State>,
tx: mpsc::Sender<Message<Request, T::Future, T::Error>>,
state: Arc<State<T::Error>>,
}
/// A [`Buffer`] that is backed by a `DirectService`.
pub type DirectBuffer<T, Request> = Buffer<DirectServiceRef<T>, Request>;
/// Future eventually completed with the response to the original request.
pub struct ResponseFuture<T> {
state: ResponseState<T>,
pub struct ResponseFuture<T, E> {
state: ResponseState<T, E>,
}
/// An error produced by a `Service` wrapped by a `Buffer`
#[derive(Debug)]
pub struct ServiceError<E> {
method: &'static str,
inner: E,
}
impl<E> ServiceError<E> {
/// The error produced by the `Service` when `method` was called.
pub fn error(&self) -> &E {
&self.inner
}
}
/// Errors produced by `Buffer`.
#[derive(Debug)]
pub enum Error<T> {
pub enum Error<E> {
/// The `Service` call errored.
Inner(T),
/// The underlying `Service` failed.
Closed,
Inner(E),
/// The underlying `Service` failed. All subsequent requests will fail.
Closed(Arc<ServiceError<E>>),
/// The underlying `Service` is currently at capacity; wait for `poll_ready`.
Full,
}
/// An adapter that exposes the associated types of a `DirectService` through `Service`.
@ -120,11 +135,12 @@ mod sealed {
where
T: DirectService<Request>,
{
pub(crate) current_message: Option<Message<Request, T::Future>>,
pub(crate) rx: mpsc::Receiver<Message<Request, T::Future>>,
pub(crate) current_message: Option<Message<Request, T::Future, T::Error>>,
pub(crate) rx: mpsc::Receiver<Message<Request, T::Future, T::Error>>,
pub(crate) service: T,
pub(crate) finish: bool,
pub(crate) state: Arc<State>,
pub(crate) failed: Option<Arc<ServiceError<T::Error>>>,
pub(crate) state: Arc<State<T::Error>>,
}
}
use sealed::Worker;
@ -150,19 +166,20 @@ pub struct SpawnError<T> {
/// Message sent over buffer
#[derive(Debug)]
struct Message<Request, Fut> {
struct Message<Request, Fut, E> {
request: Request,
tx: oneshot::Sender<Fut>,
tx: oneshot::Sender<Result<Fut, Arc<ServiceError<E>>>>,
}
/// State shared between `Buffer` and `Worker`
struct State {
open: AtomicBool,
struct State<E> {
err: lazycell::AtomicLazyCell<Arc<ServiceError<E>>>,
}
enum ResponseState<T> {
Failed,
Rx(oneshot::Receiver<T>),
enum ResponseState<T, E> {
Full,
Failed(Arc<ServiceError<E>>),
Rx(oneshot::Receiver<Result<T, Arc<ServiceError<E>>>>),
Poll(T),
}
@ -181,6 +198,7 @@ where
where
T: Send + 'static,
T::Future: Send,
T::Error: Send + Sync,
Request: Send + 'static,
{
Self::with_executor(service, bound, &DefaultExecutor::current())
@ -201,23 +219,22 @@ where
let (tx, rx) = mpsc::channel(bound);
let state = Arc::new(State {
open: AtomicBool::new(true),
err: lazycell::AtomicLazyCell::new(),
});
match Worker::spawn(DirectedService(service), rx, state.clone(), executor) {
Ok(()) => {
Ok(Buffer {
tx,
state: state,
})
},
Err(DirectedService(service)) => {
Err(SpawnError {
inner: service,
})
},
Ok(()) => Ok(Buffer { tx, state: state }),
Err(DirectedService(service)) => Err(SpawnError { inner: service }),
}
}
fn get_error_on_closed(&self) -> Arc<ServiceError<T::Error>> {
self.state
.err
.borrow()
.cloned()
.expect("Worker exited, but did not set error.")
}
}
impl<T, Request> Buffer<DirectServiceRef<T>, Request>
@ -239,21 +256,12 @@ where
let (tx, rx) = mpsc::channel(bound);
let state = Arc::new(State {
open: AtomicBool::new(true),
err: lazycell::AtomicLazyCell::new(),
});
match Worker::spawn(service, rx, state.clone(), executor) {
Ok(()) => {
Ok(Buffer {
tx,
state: state,
})
},
Err(service) => {
Err(SpawnError {
inner: service,
})
},
Ok(()) => Ok(Buffer { tx, state: state }),
Err(service) => Err(SpawnError { inner: service }),
}
}
}
@ -264,15 +272,13 @@ where
{
type Response = T::Response;
type Error = Error<T::Error>;
type Future = ResponseFuture<T::Future>;
type Future = ResponseFuture<T::Future, T::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// If the inner service has errored, then we error here.
if !self.state.open.load(Ordering::Acquire) {
return Err(Error::Closed);
} else {
self.tx.poll_ready().map_err(|_| Error::Closed)
}
self.tx
.poll_ready()
.map_err(move |_| Error::Closed(self.get_error_on_closed()))
}
fn call(&mut self, request: Request) -> Self::Future {
@ -282,16 +288,21 @@ where
// outside of task context.
let (tx, rx) = oneshot::channel();
let sent = self.tx.try_send(Message { request, tx });
if sent.is_err() {
self.state.open.store(false, Ordering::Release);
ResponseFuture {
state: ResponseState::Failed,
match self.tx.try_send(Message { request, tx }) {
Err(e) => {
if e.is_disconnected() {
ResponseFuture {
state: ResponseState::Failed(self.get_error_on_closed()),
}
} else {
ResponseFuture {
state: ResponseState::Full,
}
}
}
} else {
ResponseFuture {
Ok(_) => ResponseFuture {
state: ResponseState::Rx(rx),
}
},
}
}
}
@ -310,7 +321,7 @@ where
// ===== impl ResponseFuture =====
impl<T> Future for ResponseFuture<T>
impl<T> Future for ResponseFuture<T, T::Error>
where
T: Future,
{
@ -324,16 +335,20 @@ where
let fut;
match self.state {
Failed => {
return Err(Error::Closed);
Full => {
return Err(Error::Full);
}
Rx(ref mut rx) => {
match rx.poll() {
Ok(Async::Ready(f)) => fut = f,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => return Err(Error::Closed),
}
Failed(ref e) => {
return Err(Error::Closed(e.clone()));
}
Rx(ref mut rx) => match rx.poll() {
Ok(Async::Ready(Ok(f))) => fut = f,
Ok(Async::Ready(Err(e))) => return Err(Error::Closed(e)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(futures::Canceled) => unreachable!(
"Worker exited without sending error to all outstanding requests."
),
},
Poll(ref mut fut) => {
return fut.poll().map_err(Error::Inner);
}
@ -350,13 +365,19 @@ impl<T, Request> Worker<T, Request>
where
T: DirectService<Request>,
{
fn spawn<E>(service: T, rx: mpsc::Receiver<Message<Request, T::Future>>, state: Arc<State>, executor: &E) -> Result<(), T>
fn spawn<E>(
service: T,
rx: mpsc::Receiver<Message<Request, T::Future, T::Error>>,
state: Arc<State<T::Error>>,
executor: &E,
) -> Result<(), T>
where
E: WorkerExecutor<T, Request>,
{
let worker = Worker {
current_message: None,
finish: false,
failed: None,
rx,
service,
state,
@ -374,7 +395,7 @@ where
T: DirectService<Request>,
{
/// Return the next queued Message that hasn't been canceled.
fn poll_next_msg(&mut self) -> Poll<Option<Message<Request, T::Future>>, ()> {
fn poll_next_msg(&mut self) -> Poll<Option<Message<Request, T::Future, T::Error>>, ()> {
if self.finish {
// We've already received None and are shutting down
return Ok(Async::Ready(None));
@ -399,6 +420,35 @@ where
Ok(Async::Ready(None))
}
fn failed(&mut self, method: &'static str, error: T::Error) {
// The underlying service failed when we called `method` on it with the given `error`. We
// need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in
// an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent
// requests will also fail with the same error.
// Note that we need to handle the case where some handle is concurrently trying to send us
// a request. We need to make sure that *either* the send of the request fails *or* it
// receives an error on the `oneshot` it constructed. Specifically, we want to avoid the
// case where we send errors to all outstanding requests, and *then* the caller sends its
// request. We do this by *first* exposing the error, *then* closing the channel used to
// send more requests (so the client will see the error when the send fails), and *then*
// sending the error to all outstanding requests.
let error = Arc::new(ServiceError {
method,
inner: error,
});
if let Err(_) = self.state.err.fill(error.clone()) {
// Future::poll was called after we've already errored out!
return;
}
self.rx.close();
// By closing the mpsc::Receiver, we know that poll_next_msg will soon return Ready(None),
// which will trigger the `self.finish == true` phase. We just need to make sure that any
// requests that we receive before we've exhausted the receiver receive the error:
self.failed = Some(error);
}
}
impl<T, Request> Future for Worker<T, Request>
@ -413,6 +463,11 @@ where
loop {
match self.poll_next_msg()? {
Async::Ready(Some(msg)) => {
if let Some(ref failed) = self.failed {
let _ = msg.tx.send(Err(failed.clone()));
continue;
}
// Wait for the service to be ready
match self.service.poll_ready() {
Ok(Async::Ready(())) => {
@ -422,7 +477,7 @@ where
//
// An error means the request had been canceled in-between
// our calls, the response future will just be dropped.
let _ = msg.tx.send(response);
let _ = msg.tx.send(Ok(response));
// Try to queue another request before we poll outstanding requests.
any_outstanding = true;
@ -437,9 +492,12 @@ where
}
// We may want to also make progress on current requests
}
Err(_) => {
self.state.open.store(false, Ordering::Release);
return Ok(().into());
Err(e) => {
self.failed("poll_ready", e);
let _ = msg.tx.send(Err(self
.failed
.clone()
.expect("Worker::failed did not set self.failed?")));
}
}
}
@ -447,6 +505,10 @@ where
// No more more requests _ever_.
self.finish = true;
}
Async::NotReady if self.failed.is_some() => {
// No need to poll the service as it has already failed.
return Ok(Async::NotReady);
}
Async::NotReady if any_outstanding => {
// Make some progress on the service if we can.
}
@ -459,12 +521,16 @@ where
}
if self.finish {
try_ready!(self.service.poll_close().map_err(|_| ()));
try_ready!(self.service.poll_close().map_err(move |e| {
self.failed("poll_close", e);
}));
// We are all done!
break;
} else {
debug_assert!(any_outstanding);
if let Async::Ready(()) = self.service.poll_service().map_err(|_| ())? {
if let Async::Ready(()) = self.service.poll_service().map_err(|e| {
self.failed("poll_service", e);
})? {
// Note to future iterations that there's no reason to call poll_service.
any_outstanding = false;
} else {
@ -501,7 +567,8 @@ where
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Inner(ref why) => fmt::Display::fmt(why, f),
Error::Closed => f.pad("buffer closed"),
Error::Closed(ref e) => write!(f, "Service::{} failed: {}", e.method, e.inner),
Error::Full => f.pad("Service at capacity"),
}
}
}
@ -511,20 +578,20 @@ where
T: error::Error,
{
fn cause(&self) -> Option<&error::Error> {
if let Error::Inner(ref why) = *self {
Some(why)
} else {
None
match *self {
Error::Inner(ref why) => Some(why),
Error::Closed(ref e) => Some(&e.inner),
Error::Full => None,
}
}
fn description(&self) -> &str {
match *self {
Error::Inner(ref e) => e.description(),
Error::Closed => "buffer closed",
Error::Closed(ref e) => e.inner.description(),
Error::Full => "Service as capacity",
}
}
}
// ===== impl SpawnError =====
@ -549,5 +616,4 @@ where
fn description(&self) -> &str {
"error spawning buffer task"
}
}

View File

@ -81,8 +81,31 @@ fn when_inner_is_not_ready() {
assert_eq!(res1.wait().expect("res1.wait"), "world");
}
type Mock = tower_mock::Mock<&'static str, &'static str, ()>;
type Handle = tower_mock::Handle<&'static str, &'static str, ()>;
#[test]
fn when_inner_fails() {
let (mut service, mut handle) = new_service();
// Make the service NotReady
handle.allow(0);
handle.error("foobar");
let mut res1 = service.call("hello");
// Allow the Buffer's executor to do work
::std::thread::sleep(::std::time::Duration::from_millis(100));
with_task(|| {
let e = res1.poll().unwrap_err();
if let Error::Closed(e) = e {
assert!(format!("{:?}", e).contains("poll_ready"));
assert_eq!(e.error(), &tower_mock::Error::Other("foobar"));
} else {
panic!("unexpected error type: {:?}", e);
}
});
}
type Mock = tower_mock::Mock<&'static str, &'static str, &'static str>;
type Handle = tower_mock::Handle<&'static str, &'static str, &'static str>;
struct Exec;

View File

@ -18,7 +18,7 @@ use std::sync::{Arc, Mutex};
pub struct Mock<T, U, E> {
id: u64,
tx: Mutex<Tx<T, U, E>>,
state: Arc<Mutex<State>>,
state: Arc<Mutex<State<E>>>,
can_send: bool,
}
@ -26,7 +26,7 @@ pub struct Mock<T, U, E> {
#[derive(Debug)]
pub struct Handle<T, U, E> {
rx: Rx<T, U, E>,
state: Arc<Mutex<State>>,
state: Arc<Mutex<State<E>>>,
}
#[derive(Debug)]
@ -57,7 +57,7 @@ pub enum Error<T> {
}
#[derive(Debug)]
struct State {
struct State<E> {
// Tracks the number of requests that can be sent through
rem: u64,
@ -69,6 +69,9 @@ struct State {
// Tracks the ID for the next mock clone
next_clone_id: u64,
// Tracks the next error to yield (if any)
err_with: Option<E>,
}
type Tx<T, U, E> = mpsc::UnboundedSender<Request<T, U, E>>;
@ -116,6 +119,10 @@ impl<T, U, E> Service<T> for Mock<T, U, E> {
return Ok(().into());
}
if let Some(e) = state.err_with.take() {
return Err(Error::Other(e));
}
if state.rem > 0 {
assert!(!state.tasks.contains_key(&self.id));
@ -236,6 +243,16 @@ impl<T, U, E> Handle<T, U, E> {
}
}
}
/// Make the next poll_ method error with the given error.
pub fn error(&mut self, e: E) {
let mut state = self.state.lock().unwrap();
state.err_with = Some(e);
for (_, task) in state.tasks.drain() {
task.notify();
}
}
}
impl<T, U, E> Drop for Handle<T, U, E> {
@ -312,13 +329,14 @@ impl<T, E> Future for ResponseFuture<T, E> {
// ===== impl State =====
impl State {
fn new() -> State {
impl<E> State<E> {
fn new() -> State<E> {
State {
rem: u64::MAX,
tasks: HashMap::new(),
is_closed: false,
next_clone_id: 1,
err_with: None,
}
}
}