buffer: make Buffer::new infallible (#257)

Creating a buffer can internally fail to spawn a worker. Before, that
error was returned immediately from `Buffer::new`. This changes `new` to
always return a `Buffer`, and the spawn error is encountered via
`poll_ready`.
This commit is contained in:
Sean McArthur 2019-04-22 13:06:20 -07:00 committed by GitHub
parent cc58745952
commit d8e6d6499b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 68 additions and 26 deletions

View File

@ -41,6 +41,10 @@ where
type Service = Buffer<S, Request>; type Service = Buffer<S, Request>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> { fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
Buffer::with_executor(service, self.bound, &mut self.executor.clone()) Ok(Buffer::with_executor(
service,
self.bound,
&mut self.executor.clone(),
))
} }
} }

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
error::Error, error::{Error, SpawnError},
future::ResponseFuture, future::ResponseFuture,
message::Message, message::Message,
worker::{Handle, Worker, WorkerExecutor}, worker::{Handle, Worker, WorkerExecutor},
@ -18,7 +18,7 @@ where
T: Service<Request>, T: Service<Request>,
{ {
tx: mpsc::Sender<Message<Request, T::Future>>, tx: mpsc::Sender<Message<Request, T::Future>>,
worker: Handle, worker: Option<Handle>,
} }
impl<T, Request> Buffer<T, Request> impl<T, Request> Buffer<T, Request>
@ -33,7 +33,7 @@ where
/// ///
/// The default Tokio executor is used to run the given service, which means that this method /// The default Tokio executor is used to run the given service, which means that this method
/// must be called while on the Tokio runtime. /// must be called while on the Tokio runtime.
pub fn new(service: T, bound: usize) -> Result<Self, Error> pub fn new(service: T, bound: usize) -> Self
where where
T: Send + 'static, T: Send + 'static,
T::Future: Send, T::Future: Send,
@ -51,13 +51,24 @@ where
/// ///
/// `bound` gives the maximal number of requests that can be queued for the service before /// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers. /// backpressure is applied to callers.
pub fn with_executor<E>(service: T, bound: usize, executor: &mut E) -> Result<Self, Error> pub fn with_executor<E>(service: T, bound: usize, executor: &mut E) -> Self
where where
E: WorkerExecutor<T, Request>, E: WorkerExecutor<T, Request>,
{ {
let (tx, rx) = mpsc::channel(bound); let (tx, rx) = mpsc::channel(bound);
let worker = Worker::spawn(service, rx, executor);
Buffer { tx, worker }
}
Worker::spawn(service, rx, executor).map(|worker| Buffer { tx, worker }) fn get_worker_error(&self) -> Error {
self.worker
.as_ref()
.map(|w| w.get_error_on_closed())
.unwrap_or_else(|| {
// If there's no worker handle, that's because spawning it
// at the beginning failed.
SpawnError::new().into()
})
} }
} }
@ -72,9 +83,7 @@ where
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// If the inner service has errored, then we error here. // If the inner service has errored, then we error here.
self.tx self.tx.poll_ready().map_err(|_| self.get_worker_error())
.poll_ready()
.map_err(|_| self.worker.get_error_on_closed())
} }
fn call(&mut self, request: Request) -> Self::Future { fn call(&mut self, request: Request) -> Self::Future {
@ -87,7 +96,7 @@ where
match self.tx.try_send(Message { request, tx }) { match self.tx.try_send(Message { request, tx }) {
Err(e) => { Err(e) => {
if e.is_closed() { if e.is_closed() {
ResponseFuture::failed(self.worker.get_error_on_closed()) ResponseFuture::failed(self.get_worker_error())
} else { } else {
// When `mpsc::Sender::poll_ready` returns `Ready`, a slot // When `mpsc::Sender::poll_ready` returns `Ready`, a slot
// in the channel is reserved for the handle. Other `Sender` // in the channel is reserved for the handle. Other `Sender`

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
error::{Closed, Error, ServiceError, SpawnError}, error::{Closed, Error, ServiceError},
message::Message, message::Message,
}; };
use futures::{try_ready, Async, Future, Poll, Stream}; use futures::{try_ready, Async, Future, Poll, Stream};
@ -58,7 +58,7 @@ where
service: T, service: T,
rx: mpsc::Receiver<Message<Request, T::Future>>, rx: mpsc::Receiver<Message<Request, T::Future>>,
executor: &mut E, executor: &mut E,
) -> Result<Handle, Error> ) -> Option<Handle>
where where
E: WorkerExecutor<T, Request>, E: WorkerExecutor<T, Request>,
{ {
@ -76,8 +76,8 @@ where
}; };
match executor.spawn(worker) { match executor.spawn(worker) {
Ok(()) => Ok(handle), Ok(()) => Some(handle),
Err(_) => Err(SpawnError::new().into()), Err(_) => None,
} }
} }

View File

@ -99,20 +99,46 @@ fn when_inner_fails() {
}); });
} }
#[test]
fn when_spawn_fails() {
let (service, _handle) = mock::pair::<(), ()>();
let mut exec = ExecFn(|_| Err(()));
let mut service = Buffer::with_executor(service, 1, &mut exec);
let err = with_task(|| {
service
.poll_ready()
.expect_err("buffer poll_ready should error")
});
assert!(
err.is::<error::SpawnError>(),
"should be a SpawnError: {:?}",
err
);
}
#[test] #[test]
fn poll_ready_when_worker_is_dropped_early() { fn poll_ready_when_worker_is_dropped_early() {
let (service, _handle) = mock::pair::<(), ()>(); let (service, _handle) = mock::pair::<(), ()>();
// drop that worker right on the floor! // drop that worker right on the floor!
let mut exec = ExecFn(drop); let mut exec = ExecFn(|fut| {
drop(fut);
Ok(())
});
let mut service = Buffer::with_executor(service, 1, &mut exec).unwrap(); let mut service = Buffer::with_executor(service, 1, &mut exec);
with_task(|| { let err = with_task(|| {
service service
.poll_ready() .poll_ready()
.expect_err("buffer poll_ready should error"); .expect_err("buffer poll_ready should error")
}); });
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
} }
#[test] #[test]
@ -121,9 +147,12 @@ fn response_future_when_worker_is_dropped_early() {
// hold the worker in a cell until we want to drop it later // hold the worker in a cell until we want to drop it later
let cell = RefCell::new(None); let cell = RefCell::new(None);
let mut exec = ExecFn(|fut| *cell.borrow_mut() = Some(fut)); let mut exec = ExecFn(|fut| {
*cell.borrow_mut() = Some(fut);
Ok(())
});
let mut service = Buffer::with_executor(service, 1, &mut exec).unwrap(); let mut service = Buffer::with_executor(service, 1, &mut exec);
// keep the request in the worker // keep the request in the worker
handle.allow(0); handle.allow(0);
@ -132,7 +161,8 @@ fn response_future_when_worker_is_dropped_early() {
// drop the worker (like an executor closing up) // drop the worker (like an executor closing up)
cell.borrow_mut().take(); cell.borrow_mut().take();
response.wait().expect_err("res.wait"); let err = response.wait().expect_err("res.wait");
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
} }
type Mock = mock::Mock<&'static str, &'static str>; type Mock = mock::Mock<&'static str, &'static str>;
@ -156,23 +186,22 @@ struct ExecFn<Func>(Func);
impl<Func, F> TypedExecutor<F> for ExecFn<Func> impl<Func, F> TypedExecutor<F> for ExecFn<Func>
where where
Func: Fn(F), Func: Fn(F) -> Result<(), ()>,
F: Future<Item = (), Error = ()> + Send + 'static, F: Future<Item = (), Error = ()> + Send + 'static,
{ {
fn spawn(&mut self, fut: F) -> Result<(), SpawnError> { fn spawn(&mut self, fut: F) -> Result<(), SpawnError> {
(self.0)(fut); (self.0)(fut).map_err(|()| SpawnError::shutdown())
Ok(())
} }
} }
fn new_service() -> (Buffer<Mock, &'static str>, Handle) { fn new_service() -> (Buffer<Mock, &'static str>, Handle) {
let (service, handle) = mock::pair(); let (service, handle) = mock::pair();
// bound is >0 here because clears_canceled_requests needs multiple outstanding requests // bound is >0 here because clears_canceled_requests needs multiple outstanding requests
let service = Buffer::with_executor(service, 10, &mut Exec).unwrap(); let service = Buffer::with_executor(service, 10, &mut Exec);
(service, handle) (service, handle)
} }
fn with_task<F: FnOnce() -> U, U>(f: F) -> U { fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
use futures::future::{lazy, Future}; use futures::future::lazy;
lazy(|| Ok::<_, ()>(f())).wait().unwrap() lazy(|| Ok::<_, ()>(f())).wait().unwrap()
} }