buffer: replace generic error with Box<Error> (#168)

Refs: #131
This commit is contained in:
Carl Lerche 2019-02-26 13:40:16 -08:00 committed by GitHub
parent 92653e68a6
commit 8241fe8584
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 188 additions and 94 deletions

View File

@ -18,6 +18,7 @@ use futures::{future, stream, Async, Future, Poll, Stream};
use hdrsample::Histogram;
use rand::Rng;
use std::collections::VecDeque;
use std::fmt;
use std::time::{Duration, Instant};
use tokio::{runtime, timer};
use tower_balance as lb;
@ -104,9 +105,9 @@ where
D: Discover + Send + 'static,
D::Key: Send,
D::Service: Service<Req, Response = Rsp> + Send,
D::Error: Send + Sync,
D::Error: ::std::error::Error + Send + Sync + 'static,
<D::Service as Service<Req>>::Future: Send,
<D::Service as Service<Req>>::Error: Send + Sync,
<D::Service as Service<Req>>::Error: ::std::error::Error + Send + Sync + 'static,
C: lb::Choose<D::Key, D::Service> + Send + 'static,
{
println!("{}", name);
@ -217,14 +218,14 @@ impl Future for Delay {
impl Discover for Disco {
type Key = usize;
type Error = ();
type Service = InFlightLimit<Buffer<DelayService, Req>>;
type Error = DiscoError;
type Service = Errify<InFlightLimit<Buffer<DelayService, Req>>>;
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
match self.changes.pop_front() {
Some(Change::Insert(k, svc)) => {
let svc = Buffer::new(svc, 0).unwrap();
let svc = InFlightLimit::new(svc, ENDPOINT_CAPACITY);
let svc = Errify(InFlightLimit::new(svc, ENDPOINT_CAPACITY));
Ok(Async::Ready(Change::Insert(k, svc)))
}
Some(Change::Remove(k)) => Ok(Async::Ready(Change::Remove(k))),
@ -233,12 +234,25 @@ impl Discover for Disco {
}
}
#[derive(Debug)]
pub struct DiscoError;
impl fmt::Display for DiscoError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "discovery error")
}
}
impl std::error::Error for DiscoError {}
type DemoService<D, C> = InFlightLimit<Buffer<lb::Balance<D, C>, Req>>;
struct SendRequests<D, C>
where
D: Discover,
D::Error: ::std::error::Error + Send + Sync + 'static,
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: ::std::error::Error + Send + Sync + 'static,
C: lb::Choose<D::Key, D::Service>,
{
send_remaining: usize,
@ -249,7 +263,9 @@ where
impl<D, C> SendRequests<D, C>
where
D: Discover + Send + 'static,
D::Error: ::std::error::Error + Send + Sync + 'static,
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: ::std::error::Error + Send + Sync + 'static,
D::Key: Send,
D::Service: Send,
D::Error: Send + Sync,
@ -269,7 +285,9 @@ where
impl<D, C> Stream for SendRequests<D, C>
where
D: Discover,
D::Error: ::std::error::Error + Send + Sync + 'static,
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: ::std::error::Error + Send + Sync + 'static,
C: lb::Choose<D::Key, D::Service>,
{
type Item = <DemoService<D, C> as Service<Req>>::Response;
@ -305,3 +323,31 @@ where
Ok(Async::Ready(None))
}
}
pub struct Errify<T>(T);
impl<T, Request> Service<Request> for Errify<T>
where
T: Service<Request>,
{
type Response = T::Response;
type Error = DiscoError;
type Future = Errify<T::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready().map_err(|_| DiscoError)
}
fn call(&mut self, request: Request) -> Self::Future {
Errify(self.0.call(request))
}
}
impl<T: Future> Future for Errify<T> {
type Item = T::Item;
type Error = DiscoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll().map_err(|_| DiscoError)
}
}

View File

@ -5,9 +5,9 @@ use std::sync::Arc;
/// An error produced by a `Service` wrapped by a `Buffer`
#[derive(Debug)]
pub struct ServiceError<E> {
pub struct ServiceError {
method: &'static str,
inner: E,
inner: Arc<Error>,
}
/// Error produced when spawning the worker fails
@ -17,50 +17,34 @@ pub struct SpawnError<T> {
}
/// Errors produced by `Buffer`.
#[derive(Debug)]
pub enum Error<E> {
/// The `Service` call errored.
Inner(E),
/// The underlying `Service` failed. All subsequent requests will fail.
Closed(Arc<ServiceError<E>>),
}
// ===== impl Error =====
impl<T> fmt::Display for Error<T>
where
T: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Inner(ref why) => fmt::Display::fmt(why, f),
Error::Closed(ref e) => write!(f, "Service::{} failed: {}", e.method, e.inner),
}
}
}
impl<T> std::error::Error for Error<T>
where
T: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match *self {
Error::Inner(ref why) => Some(why),
Error::Closed(ref e) => Some(&e.inner),
}
}
}
pub(crate) type Error = Box<::std::error::Error + Send + Sync>;
// ===== impl ServiceError =====
impl<E> ServiceError<E> {
pub(crate) fn new(method: &'static str, inner: E) -> ServiceError<E> {
impl ServiceError {
pub(crate) fn new(method: &'static str, inner: Error) -> ServiceError {
let inner = Arc::new(inner);
ServiceError { method, inner }
}
/// The error produced by the `Service` when `method` was called.
pub fn error(&self) -> &E {
&self.inner
/// Private to avoid exposing `Clone` trait as part of the public API
pub(crate) fn clone(&self) -> ServiceError {
ServiceError {
method: self.method,
inner: self.inner.clone(),
}
}
}
impl fmt::Display for ServiceError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Service::{} failed: {}", self.method, self.inner)
}
}
impl std::error::Error for ServiceError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&**self.inner)
}
}

View File

@ -3,42 +3,43 @@
use error::{Error, ServiceError};
use futures::{Async, Future, Poll};
use message;
use std::sync::Arc;
/// Future eventually completed with the response to the original request.
pub struct ResponseFuture<T, E> {
state: ResponseState<T, E>,
pub struct ResponseFuture<T> {
state: ResponseState<T>,
}
enum ResponseState<T, E> {
Failed(Arc<ServiceError<E>>),
Rx(message::Rx<T, E>),
enum ResponseState<T> {
Failed(ServiceError),
Rx(message::Rx<T>),
Poll(T),
}
impl<T> ResponseFuture<T, T::Error>
impl<T> ResponseFuture<T>
where
T: Future,
T::Error: Into<Error>,
{
pub(crate) fn new(rx: message::Rx<T, T::Error>) -> Self {
pub(crate) fn new(rx: message::Rx<T>) -> Self {
ResponseFuture {
state: ResponseState::Rx(rx),
}
}
pub(crate) fn failed(err: Arc<ServiceError<T::Error>>) -> Self {
pub(crate) fn failed(err: ServiceError) -> Self {
ResponseFuture {
state: ResponseState::Failed(err),
}
}
}
impl<T> Future for ResponseFuture<T, T::Error>
impl<T> Future for ResponseFuture<T>
where
T: Future,
T::Error: Into<Error>,
{
type Item = T::Item;
type Error = Error<T::Error>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
use self::ResponseState::*;
@ -48,18 +49,18 @@ where
match self.state {
Failed(ref e) => {
return Err(Error::Closed(e.clone()));
return Err(e.clone().into());
}
Rx(ref mut rx) => match rx.poll() {
Ok(Async::Ready(Ok(f))) => fut = f,
Ok(Async::Ready(Err(e))) => return Err(Error::Closed(e)),
Ok(Async::Ready(Err(e))) => return Err(e.into()),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => unreachable!(
"Worker exited without sending error to all outstanding requests."
),
},
Poll(ref mut fut) => {
return fut.poll().map_err(Error::Inner);
return fut.poll().map_err(Into::into);
}
}

View File

@ -36,13 +36,14 @@ pub struct Buffer<T, Request>
where
T: Service<Request>,
{
tx: mpsc::Sender<Message<Request, T::Future, T::Error>>,
worker: worker::Handle<T::Error>,
tx: mpsc::Sender<Message<Request, T::Future>>,
worker: worker::Handle,
}
impl<T, Request> Buffer<T, Request>
where
T: Service<Request>,
T::Error: Into<Error>,
{
/// Creates a new `Buffer` wrapping `service`.
///
@ -85,16 +86,17 @@ where
impl<T, Request> Service<Request> for Buffer<T, Request>
where
T: Service<Request>,
T::Error: Into<Error>,
{
type Response = T::Response;
type Error = Error<T::Error>;
type Future = ResponseFuture<T::Future, T::Error>;
type Error = Error;
type Future = ResponseFuture<T::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// If the inner service has errored, then we error here.
self.tx
.poll_ready()
.map_err(move |_| Error::Closed(self.worker.get_error_on_closed()))
.map_err(move |_| self.worker.get_error_on_closed().into())
}
fn call(&mut self, request: Request) -> Self::Future {

View File

@ -1,16 +1,15 @@
use error::ServiceError;
use std::sync::Arc;
use tokio_sync::oneshot;
/// Message sent over buffer
#[derive(Debug)]
pub(crate) struct Message<Request, Fut, E> {
pub(crate) struct Message<Request, Fut> {
pub(crate) request: Request,
pub(crate) tx: Tx<Fut, E>,
pub(crate) tx: Tx<Fut>,
}
/// Response sender
pub(crate) type Tx<Fut, E> = oneshot::Sender<Result<Fut, Arc<ServiceError<E>>>>;
pub(crate) type Tx<Fut> = oneshot::Sender<Result<Fut, ServiceError>>;
/// Response receiver
pub(crate) type Rx<Fut, E> = oneshot::Receiver<Result<Fut, Arc<ServiceError<E>>>>;
pub(crate) type Rx<Fut> = oneshot::Receiver<Result<Fut, ServiceError>>;

View File

@ -1,4 +1,4 @@
use error::ServiceError;
use error::{Error, ServiceError};
use futures::future::Executor;
use futures::{Async, Future, Poll, Stream};
use message::Message;
@ -16,18 +16,19 @@ use tower_service::Service;
pub struct Worker<T, Request>
where
T: Service<Request>,
T::Error: Into<Error>,
{
current_message: Option<Message<Request, T::Future, T::Error>>,
rx: mpsc::Receiver<Message<Request, T::Future, T::Error>>,
current_message: Option<Message<Request, T::Future>>,
rx: mpsc::Receiver<Message<Request, T::Future>>,
service: T,
finish: bool,
failed: Option<Arc<ServiceError<T::Error>>>,
handle: Handle<T::Error>,
failed: Option<ServiceError>,
handle: Handle,
}
/// Get the error out
pub(crate) struct Handle<E> {
inner: Arc<Mutex<Option<Arc<ServiceError<E>>>>>,
pub(crate) struct Handle {
inner: Arc<Mutex<Option<ServiceError>>>,
}
/// This trait allows you to use either Tokio's threaded runtime's executor or the `current_thread`
@ -35,23 +36,27 @@ pub(crate) struct Handle<E> {
pub trait WorkerExecutor<T, Request>: Executor<Worker<T, Request>>
where
T: Service<Request>,
T::Error: Into<Error>,
{
}
impl<T, Request, E: Executor<Worker<T, Request>>> WorkerExecutor<T, Request> for E where
T: Service<Request>
impl<T, Request, E: Executor<Worker<T, Request>>> WorkerExecutor<T, Request> for E
where
T: Service<Request>,
T::Error: Into<Error>,
{
}
impl<T, Request> Worker<T, Request>
where
T: Service<Request>,
T::Error: Into<Error>,
{
pub(crate) fn spawn<E>(
service: T,
rx: mpsc::Receiver<Message<Request, T::Future, T::Error>>,
rx: mpsc::Receiver<Message<Request, T::Future>>,
executor: &E,
) -> Result<Handle<T::Error>, T>
) -> Result<Handle, T>
where
E: WorkerExecutor<T, Request>,
{
@ -75,7 +80,7 @@ where
}
/// Return the next queued Message that hasn't been canceled.
fn poll_next_msg(&mut self) -> Poll<Option<Message<Request, T::Future, T::Error>>, ()> {
fn poll_next_msg(&mut self) -> Poll<Option<Message<Request, T::Future>>, ()> {
if self.finish {
// We've already received None and are shutting down
return Ok(Async::Ready(None));
@ -114,7 +119,7 @@ where
// request. We do this by *first* exposing the error, *then* closing the channel used to
// send more requests (so the client will see the error when the send fails), and *then*
// sending the error to all outstanding requests.
let error = Arc::new(ServiceError::new(method, error));
let error = ServiceError::new(method, error.into());
let mut inner = self.handle.inner.lock().unwrap();
@ -138,6 +143,7 @@ where
impl<T, Request> Future for Worker<T, Request>
where
T: Service<Request>,
T::Error: Into<Error>,
{
type Item = ();
type Error = ();
@ -176,8 +182,9 @@ where
self.failed("poll_ready", e);
let _ = msg.tx.send(Err(self
.failed
.clone()
.expect("Worker::failed did not set self.failed?")));
.as_ref()
.expect("Worker::failed did not set self.failed?")
.clone()));
}
}
}
@ -200,8 +207,8 @@ where
}
}
impl<E> Handle<E> {
pub(crate) fn get_error_on_closed(&self) -> Arc<ServiceError<E>> {
impl Handle {
pub(crate) fn get_error_on_closed(&self) -> ServiceError {
self.inner
.lock()
.unwrap()
@ -211,8 +218,8 @@ impl<E> Handle<E> {
}
}
impl<E> Clone for Handle<E> {
fn clone(&self) -> Handle<E> {
impl Clone for Handle {
fn clone(&self) -> Handle {
Handle {
inner: self.inner.clone(),
}

View File

@ -7,6 +7,7 @@ use futures::prelude::*;
use tower_buffer::*;
use tower_service::*;
use std::fmt;
use std::thread;
#[test]
@ -83,11 +84,13 @@ fn when_inner_is_not_ready() {
#[test]
fn when_inner_fails() {
use std::error::Error as StdError;
let (mut service, mut handle) = new_service();
// Make the service NotReady
handle.allow(0);
handle.error("foobar");
handle.error(Error("foobar"));
let mut res1 = service.call("hello");
@ -95,17 +98,42 @@ fn when_inner_fails() {
::std::thread::sleep(::std::time::Duration::from_millis(100));
with_task(|| {
let e = res1.poll().unwrap_err();
if let error::Error::Closed(e) = e {
assert!(format!("{:?}", e).contains("poll_ready"));
assert_eq!(e.error(), &tower_mock::Error::Other("foobar"));
if let Some(e) = e.downcast_ref::<error::ServiceError>() {
assert!(format!("{}", e).contains("poll_ready"));
let e = e
.source()
.expect("nope 1")
.downcast_ref::<tower_mock::Error<Error>>()
.expect("nope 1_2");
match e {
tower_mock::Error::Other(e) => assert_eq!(e.0, "foobar"),
_ => panic!("unexpected mock error"),
}
} else {
panic!("unexpected error type: {:?}", e);
}
});
}
type Mock = tower_mock::Mock<&'static str, &'static str, &'static str>;
type Handle = tower_mock::Handle<&'static str, &'static str, &'static str>;
#[derive(Debug)]
struct Error(&'static str);
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(fmt)
}
}
impl ::std::error::Error for Error {
fn source(&self) -> Option<&(::std::error::Error + 'static)> {
None
}
}
type Mock = tower_mock::Mock<&'static str, &'static str, Error>;
type Handle = tower_mock::Handle<&'static str, &'static str, Error>;
struct Exec;

View File

@ -10,6 +10,7 @@ use futures::task::{self, Task};
use futures::{Async, Future, Poll, Stream};
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
use std::{ops, u64};
@ -336,6 +337,32 @@ impl<T, E> Future for ResponseFuture<T, E> {
}
}
// ===== impl Error =====
impl<T> fmt::Display for Error<T>
where
T: fmt::Display,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Closed => write!(fmt, "mock service is closed"),
Error::Other(e) => e.fmt(fmt),
}
}
}
impl<T> std::error::Error for Error<T>
where
T: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Closed => None,
Error::Other(e) => Some(e),
}
}
}
// ===== impl State =====
impl<E> State<E> {