From d8e6d6499be393448b91d6288fd819643dd1b224 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 22 Apr 2019 13:06:20 -0700 Subject: [PATCH] buffer: make Buffer::new infallible (#257) Creating a buffer can internally fail to spawn a worker. Before, that error was returned immediately from `Buffer::new`. This changes `new` to always return a `Buffer`, and the spawn error is encountered via `poll_ready`. --- tower-buffer/src/layer.rs | 6 +++- tower-buffer/src/service.rs | 27 ++++++++++++------ tower-buffer/src/worker.rs | 8 +++--- tower-buffer/tests/buffer.rs | 53 ++++++++++++++++++++++++++++-------- 4 files changed, 68 insertions(+), 26 deletions(-) diff --git a/tower-buffer/src/layer.rs b/tower-buffer/src/layer.rs index 8b82145..900f739 100644 --- a/tower-buffer/src/layer.rs +++ b/tower-buffer/src/layer.rs @@ -41,6 +41,10 @@ where type Service = Buffer; fn layer(&self, service: S) -> Result { - Buffer::with_executor(service, self.bound, &mut self.executor.clone()) + Ok(Buffer::with_executor( + service, + self.bound, + &mut self.executor.clone(), + )) } } diff --git a/tower-buffer/src/service.rs b/tower-buffer/src/service.rs index 82f804a..337a95b 100644 --- a/tower-buffer/src/service.rs +++ b/tower-buffer/src/service.rs @@ -1,5 +1,5 @@ use crate::{ - error::Error, + error::{Error, SpawnError}, future::ResponseFuture, message::Message, worker::{Handle, Worker, WorkerExecutor}, @@ -18,7 +18,7 @@ where T: Service, { tx: mpsc::Sender>, - worker: Handle, + worker: Option, } impl Buffer @@ -33,7 +33,7 @@ where /// /// The default Tokio executor is used to run the given service, which means that this method /// must be called while on the Tokio runtime. - pub fn new(service: T, bound: usize) -> Result + pub fn new(service: T, bound: usize) -> Self where T: Send + 'static, T::Future: Send, @@ -51,13 +51,24 @@ where /// /// `bound` gives the maximal number of requests that can be queued for the service before /// backpressure is applied to callers. - pub fn with_executor(service: T, bound: usize, executor: &mut E) -> Result + pub fn with_executor(service: T, bound: usize, executor: &mut E) -> Self where E: WorkerExecutor, { let (tx, rx) = mpsc::channel(bound); + let worker = Worker::spawn(service, rx, executor); + Buffer { tx, worker } + } - Worker::spawn(service, rx, executor).map(|worker| Buffer { tx, worker }) + fn get_worker_error(&self) -> Error { + self.worker + .as_ref() + .map(|w| w.get_error_on_closed()) + .unwrap_or_else(|| { + // If there's no worker handle, that's because spawning it + // at the beginning failed. + SpawnError::new().into() + }) } } @@ -72,9 +83,7 @@ where fn poll_ready(&mut self) -> Poll<(), Self::Error> { // If the inner service has errored, then we error here. - self.tx - .poll_ready() - .map_err(|_| self.worker.get_error_on_closed()) + self.tx.poll_ready().map_err(|_| self.get_worker_error()) } fn call(&mut self, request: Request) -> Self::Future { @@ -87,7 +96,7 @@ where match self.tx.try_send(Message { request, tx }) { Err(e) => { if e.is_closed() { - ResponseFuture::failed(self.worker.get_error_on_closed()) + ResponseFuture::failed(self.get_worker_error()) } else { // When `mpsc::Sender::poll_ready` returns `Ready`, a slot // in the channel is reserved for the handle. Other `Sender` diff --git a/tower-buffer/src/worker.rs b/tower-buffer/src/worker.rs index 52a4f36..a9955a8 100644 --- a/tower-buffer/src/worker.rs +++ b/tower-buffer/src/worker.rs @@ -1,5 +1,5 @@ use crate::{ - error::{Closed, Error, ServiceError, SpawnError}, + error::{Closed, Error, ServiceError}, message::Message, }; use futures::{try_ready, Async, Future, Poll, Stream}; @@ -58,7 +58,7 @@ where service: T, rx: mpsc::Receiver>, executor: &mut E, - ) -> Result + ) -> Option where E: WorkerExecutor, { @@ -76,8 +76,8 @@ where }; match executor.spawn(worker) { - Ok(()) => Ok(handle), - Err(_) => Err(SpawnError::new().into()), + Ok(()) => Some(handle), + Err(_) => None, } } diff --git a/tower-buffer/tests/buffer.rs b/tower-buffer/tests/buffer.rs index 3f61b6e..196918b 100644 --- a/tower-buffer/tests/buffer.rs +++ b/tower-buffer/tests/buffer.rs @@ -99,20 +99,46 @@ fn when_inner_fails() { }); } +#[test] +fn when_spawn_fails() { + let (service, _handle) = mock::pair::<(), ()>(); + + let mut exec = ExecFn(|_| Err(())); + + let mut service = Buffer::with_executor(service, 1, &mut exec); + + let err = with_task(|| { + service + .poll_ready() + .expect_err("buffer poll_ready should error") + }); + + assert!( + err.is::(), + "should be a SpawnError: {:?}", + err + ); +} + #[test] fn poll_ready_when_worker_is_dropped_early() { let (service, _handle) = mock::pair::<(), ()>(); // drop that worker right on the floor! - let mut exec = ExecFn(drop); + let mut exec = ExecFn(|fut| { + drop(fut); + Ok(()) + }); - let mut service = Buffer::with_executor(service, 1, &mut exec).unwrap(); + let mut service = Buffer::with_executor(service, 1, &mut exec); - with_task(|| { + let err = with_task(|| { service .poll_ready() - .expect_err("buffer poll_ready should error"); + .expect_err("buffer poll_ready should error") }); + + assert!(err.is::(), "should be a Closed: {:?}", err); } #[test] @@ -121,9 +147,12 @@ fn response_future_when_worker_is_dropped_early() { // hold the worker in a cell until we want to drop it later let cell = RefCell::new(None); - let mut exec = ExecFn(|fut| *cell.borrow_mut() = Some(fut)); + let mut exec = ExecFn(|fut| { + *cell.borrow_mut() = Some(fut); + Ok(()) + }); - let mut service = Buffer::with_executor(service, 1, &mut exec).unwrap(); + let mut service = Buffer::with_executor(service, 1, &mut exec); // keep the request in the worker handle.allow(0); @@ -132,7 +161,8 @@ fn response_future_when_worker_is_dropped_early() { // drop the worker (like an executor closing up) cell.borrow_mut().take(); - response.wait().expect_err("res.wait"); + let err = response.wait().expect_err("res.wait"); + assert!(err.is::(), "should be a Closed: {:?}", err); } type Mock = mock::Mock<&'static str, &'static str>; @@ -156,23 +186,22 @@ struct ExecFn(Func); impl TypedExecutor for ExecFn where - Func: Fn(F), + Func: Fn(F) -> Result<(), ()>, F: Future + Send + 'static, { fn spawn(&mut self, fut: F) -> Result<(), SpawnError> { - (self.0)(fut); - Ok(()) + (self.0)(fut).map_err(|()| SpawnError::shutdown()) } } fn new_service() -> (Buffer, Handle) { let (service, handle) = mock::pair(); // bound is >0 here because clears_canceled_requests needs multiple outstanding requests - let service = Buffer::with_executor(service, 10, &mut Exec).unwrap(); + let service = Buffer::with_executor(service, 10, &mut Exec); (service, handle) } fn with_task U, U>(f: F) -> U { - use futures::future::{lazy, Future}; + use futures::future::lazy; lazy(|| Ok::<_, ()>(f())).wait().unwrap() }