From f42338934a95dbdf0d37824eb210bca3a65c20d4 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 21 Feb 2019 12:18:56 -0800 Subject: [PATCH] Require `poll_ready` to be called before `call` (#161) This updates the `Service` contract requiring `poll_ready` to be called before `call`. This allows `Service::call` to panic in the event the user of the service omits `poll_ready` or does not wait until `Ready` is observed. --- tower-balance/src/lib.rs | 4 -- tower-buffer/src/lib.rs | 20 +++----- tower-filter/src/lib.rs | 24 ++++----- tower-in-flight-limit/src/lib.rs | 50 +++++-------------- .../tests/in_flight_limit.rs | 2 + tower-mock/src/lib.rs | 30 ++++++++--- tower-mock/tests/mock.rs | 11 +--- tower-rate-limit/src/lib.rs | 17 +++---- tower-rate-limit/tests/rate_limit.rs | 13 ++--- tower-reconnect/src/lib.rs | 17 ++----- tower-service/src/lib.rs | 22 +++++--- 11 files changed, 87 insertions(+), 123 deletions(-) diff --git a/tower-balance/src/lib.rs b/tower-balance/src/lib.rs index e7482bc..cdd4f67 100644 --- a/tower-balance/src/lib.rs +++ b/tower-balance/src/lib.rs @@ -53,7 +53,6 @@ pub struct Balance { pub enum Error { Inner(T), Balance(U), - NotReady, } pub struct ResponseFuture(F, PhantomData); @@ -357,7 +356,6 @@ where match *self { Error::Inner(ref why) => fmt::Display::fmt(why, f), Error::Balance(ref why) => write!(f, "load balancing failed: {}", why), - Error::NotReady => f.pad("not ready"), } } } @@ -371,7 +369,6 @@ where match *self { Error::Inner(ref why) => Some(why), Error::Balance(ref why) => Some(why), - _ => None, } } @@ -379,7 +376,6 @@ where match *self { Error::Inner(_) => "inner service error", Error::Balance(_) => "load balancing failed", - Error::NotReady => "not ready", } } } diff --git a/tower-buffer/src/lib.rs b/tower-buffer/src/lib.rs index 2e9bd52..6c621ea 100644 --- a/tower-buffer/src/lib.rs +++ b/tower-buffer/src/lib.rs @@ -58,8 +58,6 @@ pub enum Error { Inner(E), /// The underlying `Service` failed. All subsequent requests will fail. Closed(Arc>), - /// The underlying `Service` is currently at capacity; wait for `poll_ready`. - Full, } mod sealed { @@ -113,7 +111,6 @@ struct State { } enum ResponseState { - Full, Failed(Arc>), Rx(oneshot::Receiver>>>), Poll(T), @@ -202,9 +199,14 @@ where state: ResponseState::Failed(self.get_error_on_closed()), } } else { - ResponseFuture { - state: ResponseState::Full, - } + // When `mpsc::Sender::poll_ready` returns `Ready`, a slot + // in the channel is reserved for the handle. Other `Sender` + // handles may not send a message using that slot. This + // guarantees capacity for `request`. + // + // Given this, the only way to hit this code path is if + // `poll_ready` has not been called & `Ready` returned. + panic!("buffer full; poll_ready must be called first"); } } Ok(_) => ResponseFuture { @@ -242,9 +244,6 @@ where let fut; match self.state { - Full => { - return Err(Error::Full); - } Failed(ref e) => { return Err(Error::Closed(e.clone())); } @@ -433,7 +432,6 @@ where match *self { Error::Inner(ref why) => fmt::Display::fmt(why, f), Error::Closed(ref e) => write!(f, "Service::{} failed: {}", e.method, e.inner), - Error::Full => f.pad("Service at capacity"), } } } @@ -446,7 +444,6 @@ where match *self { Error::Inner(ref why) => Some(why), Error::Closed(ref e) => Some(&e.inner), - Error::Full => None, } } @@ -454,7 +451,6 @@ where match *self { Error::Inner(ref e) => e.description(), Error::Closed(ref e) => e.inner.description(), - Error::Full => "Service as capacity", } } } diff --git a/tower-filter/src/lib.rs b/tower-filter/src/lib.rs index f35fa6c..76b51bd 100644 --- a/tower-filter/src/lib.rs +++ b/tower-filter/src/lib.rs @@ -25,7 +25,7 @@ pub struct ResponseFuture where S: Service, { - inner: Option>, + inner: ResponseInner, } #[derive(Debug)] @@ -47,9 +47,6 @@ pub enum Error { /// The inner service produced an error. Inner(U), - - /// The service is out of capacity. - NoCapacity, } /// Checks a request @@ -74,7 +71,7 @@ enum State { Check(Request), WaitReady(Request), WaitResponse(U), - NoCapacity, + Invalid, } // ===== impl Filter ===== @@ -125,7 +122,7 @@ where let rem = self.counts.rem.load(SeqCst); if rem == 0 { - return ResponseFuture { inner: None }; + panic!("service not ready; poll_ready must be called first"); } // Decrement @@ -141,12 +138,12 @@ where let counts = self.counts.clone(); ResponseFuture { - inner: Some(ResponseInner { + inner: ResponseInner { state: State::Check(request), check, service, counts, - }), + }, } } } @@ -177,10 +174,7 @@ where type Error = Error; fn poll(&mut self) -> Poll { - match self.inner { - Some(ref mut inner) => inner.poll(), - None => Err(Error::NoCapacity), - } + self.inner.poll() } } @@ -215,7 +209,7 @@ where use self::State::*; loop { - match mem::replace(&mut self.state, NoCapacity) { + match mem::replace(&mut self.state, Invalid) { Check(request) => { // Poll predicate match self.check.poll() { @@ -258,8 +252,8 @@ where return ret; } - NoCapacity => { - return Err(Error::NoCapacity); + Invalid => { + panic!("invalid state"); } } } diff --git a/tower-in-flight-limit/src/lib.rs b/tower-in-flight-limit/src/lib.rs index 13d3262..dadebe8 100644 --- a/tower-in-flight-limit/src/lib.rs +++ b/tower-in-flight-limit/src/lib.rs @@ -22,13 +22,12 @@ pub struct InFlightLimit { /// Error returned when the service has reached its limit. #[derive(Debug)] pub enum Error { - NoCapacity, Upstream(T), } #[derive(Debug)] pub struct ResponseFuture { - inner: Option, + inner: T, shared: Arc, } @@ -114,15 +113,12 @@ where } else { // Try to reserve if !self.state.shared.reserve() { - return ResponseFuture { - inner: None, - shared: self.state.shared.clone(), - }; + panic!("service not ready; call poll_ready first"); } } ResponseFuture { - inner: Some(self.inner.call(request)), + inner: self.inner.call(request), shared: self.state.shared.clone(), } } @@ -140,35 +136,19 @@ where fn poll(&mut self) -> Poll { use futures::Async::*; - let res = match self.inner { - Some(ref mut f) => match f.poll() { - Ok(Ready(v)) => { - self.shared.release(); - Ok(Ready(v)) - } - Ok(NotReady) => { - return Ok(NotReady); - } - Err(e) => { - self.shared.release(); - Err(Error::Upstream(e)) - } - }, - None => Err(Error::NoCapacity), - }; - - // Drop the inner future - self.inner = None; - - res + match self.inner.poll() { + Ok(Ready(v)) => Ok(Ready(v)), + Ok(NotReady) => { + return Ok(NotReady); + } + Err(e) => Err(Error::Upstream(e)), + } } } impl Drop for ResponseFuture { fn drop(&mut self) { - if self.inner.is_some() { - self.shared.release(); - } + self.shared.release(); } } @@ -238,7 +218,6 @@ where fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { Error::Upstream(ref why) => fmt::Display::fmt(why, f), - Error::NoCapacity => write!(f, "in-flight limit exceeded"), } } } @@ -248,17 +227,14 @@ where T: error::Error, { fn cause(&self) -> Option<&error::Error> { - if let Error::Upstream(ref why) = *self { - Some(why) - } else { - None + match *self { + Error::Upstream(ref why) => Some(why), } } fn description(&self) -> &str { match *self { Error::Upstream(_) => "upstream service error", - Error::NoCapacity => "in-flight limit exceeded", } } } diff --git a/tower-in-flight-limit/tests/in_flight_limit.rs b/tower-in-flight-limit/tests/in_flight_limit.rs index eda033b..0bc3d50 100644 --- a/tower-in-flight-limit/tests/in_flight_limit.rs +++ b/tower-in-flight-limit/tests/in_flight_limit.rs @@ -68,6 +68,7 @@ fn basic_service_limit_functionality_with_poll_ready() { } #[test] +#[should_panic] fn basic_service_limit_functionality_without_poll_ready() { let mut task = MockTask::new(); @@ -112,6 +113,7 @@ fn basic_service_limit_functionality_without_poll_ready() { } #[test] +#[should_panic] fn request_without_capacity() { let mut task = MockTask::new(); diff --git a/tower-mock/src/lib.rs b/tower-mock/src/lib.rs index 7f62032..b7fc9d6 100644 --- a/tower-mock/src/lib.rs +++ b/tower-mock/src/lib.rs @@ -52,7 +52,6 @@ pub struct ResponseFuture { #[derive(Debug, PartialEq)] pub enum Error { Closed, - NoCapacity, Other(T), } @@ -148,9 +147,7 @@ impl Service for Mock { if !self.can_send { if state.rem == 0 { - return ResponseFuture { - rx: Error::NoCapacity, - }; + panic!("service not ready; poll_ready must be called first"); } } @@ -206,7 +203,17 @@ impl Clone for Mock { impl Drop for Mock { fn drop(&mut self) { - let mut state = self.state.lock().unwrap(); + let mut state = match self.state.lock() { + Ok(v) => v, + Err(e) => { + if ::std::thread::panicking() { + return; + } + + panic!("{:?}", e); + } + }; + state.tasks.remove(&self.id); } } @@ -252,7 +259,17 @@ impl Handle { impl Drop for Handle { fn drop(&mut self) { - let mut state = self.state.lock().unwrap(); + let mut state = match self.state.lock() { + Ok(v) => v, + Err(e) => { + if ::std::thread::panicking() { + return; + } + + panic!("{:?}", e); + } + }; + state.is_closed = true; for (_, task) in state.tasks.drain() { @@ -314,7 +331,6 @@ impl Future for ResponseFuture { Ok(Async::NotReady) => Ok(Async::NotReady), Err(_) => Err(Error::Closed), }, - Error::NoCapacity => Err(Error::NoCapacity), Error::Closed => Err(Error::Closed), } } diff --git a/tower-mock/tests/mock.rs b/tower-mock/tests/mock.rs index 23339c4..0bd0b7f 100644 --- a/tower-mock/tests/mock.rs +++ b/tower-mock/tests/mock.rs @@ -35,6 +35,7 @@ fn single_request_ready() { } #[test] +#[should_panic] fn backpressure() { let (mut mock, mut handle) = new_mock(); @@ -46,15 +47,7 @@ fn backpressure() { }); // Try to send a request - let response = mock.call("hello?".into()); - - // Did not send - with_task(|| { - assert!(handle.poll_request().unwrap().is_not_ready()); - }); - - // Response is an error - assert!(response.wait().is_err()); + mock.call("hello?".into()); } type Mock = tower_mock::Mock; diff --git a/tower-rate-limit/src/lib.rs b/tower-rate-limit/src/lib.rs index b7acbd3..f9136f4 100644 --- a/tower-rate-limit/src/lib.rs +++ b/tower-rate-limit/src/lib.rs @@ -1,7 +1,5 @@ -//! Tower middleware that applies a timeout to requests. -//! -//! If the response does not complete within the specified timeout, the response -//! will be aborted. +//! A Tower middleware that rate limits the requests that are passed to the +//! inner service. #[macro_use] extern crate futures; @@ -38,7 +36,7 @@ pub enum Error { } pub struct ResponseFuture { - inner: Option, + inner: T, } #[derive(Debug)] @@ -145,10 +143,10 @@ where } // Call the inner future - let inner = Some(self.inner.call(request)); + let inner = self.inner.call(request); ResponseFuture { inner } } - State::Limited(..) => ResponseFuture { inner: None }, + State::Limited(..) => panic!("service not ready; poll_ready must be called first"), } } } @@ -161,10 +159,7 @@ where type Error = Error; fn poll(&mut self) -> Poll { - match self.inner { - Some(ref mut f) => f.poll().map_err(Error::Upstream), - None => Err(Error::RateLimit), - } + self.inner.poll().map_err(Error::Upstream) } } diff --git a/tower-rate-limit/tests/rate_limit.rs b/tower-rate-limit/tests/rate_limit.rs index 0edb7e6..6696011 100644 --- a/tower-rate-limit/tests/rate_limit.rs +++ b/tower-rate-limit/tests/rate_limit.rs @@ -25,14 +25,11 @@ fn reaching_capacity() { let response = rt.block_on(response); assert_eq!(response.unwrap(), "world"); - // Sending another request is rejected - let response = service.call("no"); - - let poll_request = rt.block_on(future::lazy(|| handle.poll_request())); - assert!(poll_request.unwrap().is_not_ready()); - - let response = rt.block_on(response); - assert!(response.is_err()); + rt.block_on(future::lazy(|| { + assert!(service.poll_ready().unwrap().is_not_ready()); + Ok::<_, ()>(()) + })) + .unwrap(); let poll_request = rt.block_on(future::lazy(|| handle.poll_request())); assert!(poll_request.unwrap().is_not_ready()); diff --git a/tower-reconnect/src/lib.rs b/tower-reconnect/src/lib.rs index 87bbcfb..80505a3 100644 --- a/tower-reconnect/src/lib.rs +++ b/tower-reconnect/src/lib.rs @@ -23,11 +23,10 @@ where pub enum Error { Service(T), Connect(U), - NotReady, } pub struct ResponseFuture { - inner: Option, + inner: F, _connect_error_marker: PhantomData E>, } @@ -135,11 +134,11 @@ where fn call(&mut self, request: Request) -> Self::Future { let service = match self.state { State::Connected(ref mut service) => service, - _ => return ResponseFuture::new(None), + _ => panic!("service not ready; poll_ready must be called first"), }; let fut = service.call(request); - ResponseFuture::new(Some(fut)) + ResponseFuture::new(fut) } } @@ -162,7 +161,7 @@ where // ===== impl ResponseFuture ===== impl ResponseFuture { - fn new(inner: Option) -> Self { + fn new(inner: F) -> Self { ResponseFuture { inner, _connect_error_marker: PhantomData, @@ -178,10 +177,7 @@ where type Error = Error; fn poll(&mut self) -> Poll { - match self.inner { - Some(ref mut f) => f.poll().map_err(Error::Service), - None => Err(Error::NotReady), - } + self.inner.poll().map_err(Error::Service) } } @@ -196,7 +192,6 @@ where match *self { Error::Service(ref why) => fmt::Display::fmt(why, f), Error::Connect(ref why) => write!(f, "connection failed: {}", why), - Error::NotReady => f.pad("not ready"), } } } @@ -210,7 +205,6 @@ where match *self { Error::Service(ref why) => Some(why), Error::Connect(ref why) => Some(why), - Error::NotReady => None, } } @@ -218,7 +212,6 @@ where match *self { Error::Service(_) => "inner service error", Error::Connect(_) => "connection failed", - Error::NotReady => "not ready", } } } diff --git a/tower-service/src/lib.rs b/tower-service/src/lib.rs index 7a4dc46..f4e8c7c 100644 --- a/tower-service/src/lib.rs +++ b/tower-service/src/lib.rs @@ -171,20 +171,26 @@ pub trait Service { /// is notified when the service becomes ready again. This function is /// expected to be called while on a task. /// - /// This is a **best effort** implementation. False positives are permitted. - /// It is permitted for the service to return `Ready` from a `poll_ready` - /// call and the next invocation of `call` results in an error. + /// If `Err` is returned, the service is no longer able to service requests + /// and the caller should discard the service instance. + /// + /// Once `poll_ready` returns `Ready`, a request may be dispatched to the + /// service using `call`. Until a request is dispatched, repeated calls to + /// `poll_ready` must return either `Ready` or `Err`. fn poll_ready(&mut self) -> Poll<(), Self::Error>; /// Process the request and return the response asynchronously. /// /// This function is expected to be callable off task. As such, - /// implementations should take care to not call `poll_ready`. If the - /// service is at capacity and the request is unable to be handled, the - /// returned `Future` should resolve to an error. + /// implementations should take care to not call `poll_ready`. /// - /// Calling `call` without calling `poll_ready` is permitted. The - /// implementation must be resilient to this fact. + /// Before dispatching a request, `poll_ready` must be called and return + /// `Ready`. + /// + /// # Panics + /// + /// Implementations are permitted to panic if `call` is invoked without + /// obtaining `Ready` from `poll_ready`. fn call(&mut self, req: Request) -> Self::Future; }