Require `poll_ready` to be called before `call` (#161)

This updates the `Service` contract requiring `poll_ready` to be called
before `call`. This allows `Service::call` to panic in the event the
user of the service omits `poll_ready` or does not wait until `Ready` is
observed.
This commit is contained in:
Carl Lerche 2019-02-21 12:18:56 -08:00 committed by GitHub
parent 79349816da
commit f42338934a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 87 additions and 123 deletions

View File

@ -53,7 +53,6 @@ pub struct Balance<D: Discover, C> {
pub enum Error<T, U> {
Inner(T),
Balance(U),
NotReady,
}
pub struct ResponseFuture<F: Future, E>(F, PhantomData<E>);
@ -357,7 +356,6 @@ where
match *self {
Error::Inner(ref why) => fmt::Display::fmt(why, f),
Error::Balance(ref why) => write!(f, "load balancing failed: {}", why),
Error::NotReady => f.pad("not ready"),
}
}
}
@ -371,7 +369,6 @@ where
match *self {
Error::Inner(ref why) => Some(why),
Error::Balance(ref why) => Some(why),
_ => None,
}
}
@ -379,7 +376,6 @@ where
match *self {
Error::Inner(_) => "inner service error",
Error::Balance(_) => "load balancing failed",
Error::NotReady => "not ready",
}
}
}

View File

@ -58,8 +58,6 @@ pub enum Error<E> {
Inner(E),
/// The underlying `Service` failed. All subsequent requests will fail.
Closed(Arc<ServiceError<E>>),
/// The underlying `Service` is currently at capacity; wait for `poll_ready`.
Full,
}
mod sealed {
@ -113,7 +111,6 @@ struct State<E> {
}
enum ResponseState<T, E> {
Full,
Failed(Arc<ServiceError<E>>),
Rx(oneshot::Receiver<Result<T, Arc<ServiceError<E>>>>),
Poll(T),
@ -202,9 +199,14 @@ where
state: ResponseState::Failed(self.get_error_on_closed()),
}
} else {
ResponseFuture {
state: ResponseState::Full,
}
// When `mpsc::Sender::poll_ready` returns `Ready`, a slot
// in the channel is reserved for the handle. Other `Sender`
// handles may not send a message using that slot. This
// guarantees capacity for `request`.
//
// Given this, the only way to hit this code path is if
// `poll_ready` has not been called & `Ready` returned.
panic!("buffer full; poll_ready must be called first");
}
}
Ok(_) => ResponseFuture {
@ -242,9 +244,6 @@ where
let fut;
match self.state {
Full => {
return Err(Error::Full);
}
Failed(ref e) => {
return Err(Error::Closed(e.clone()));
}
@ -433,7 +432,6 @@ where
match *self {
Error::Inner(ref why) => fmt::Display::fmt(why, f),
Error::Closed(ref e) => write!(f, "Service::{} failed: {}", e.method, e.inner),
Error::Full => f.pad("Service at capacity"),
}
}
}
@ -446,7 +444,6 @@ where
match *self {
Error::Inner(ref why) => Some(why),
Error::Closed(ref e) => Some(&e.inner),
Error::Full => None,
}
}
@ -454,7 +451,6 @@ where
match *self {
Error::Inner(ref e) => e.description(),
Error::Closed(ref e) => e.inner.description(),
Error::Full => "Service as capacity",
}
}
}

View File

@ -25,7 +25,7 @@ pub struct ResponseFuture<T, S, Request>
where
S: Service<Request>,
{
inner: Option<ResponseInner<T, S, Request>>,
inner: ResponseInner<T, S, Request>,
}
#[derive(Debug)]
@ -47,9 +47,6 @@ pub enum Error<T, U> {
/// The inner service produced an error.
Inner(U),
/// The service is out of capacity.
NoCapacity,
}
/// Checks a request
@ -74,7 +71,7 @@ enum State<Request, U> {
Check(Request),
WaitReady(Request),
WaitResponse(U),
NoCapacity,
Invalid,
}
// ===== impl Filter =====
@ -125,7 +122,7 @@ where
let rem = self.counts.rem.load(SeqCst);
if rem == 0 {
return ResponseFuture { inner: None };
panic!("service not ready; poll_ready must be called first");
}
// Decrement
@ -141,12 +138,12 @@ where
let counts = self.counts.clone();
ResponseFuture {
inner: Some(ResponseInner {
inner: ResponseInner {
state: State::Check(request),
check,
service,
counts,
}),
},
}
}
}
@ -177,10 +174,7 @@ where
type Error = Error<T::Error, S::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner {
Some(ref mut inner) => inner.poll(),
None => Err(Error::NoCapacity),
}
self.inner.poll()
}
}
@ -215,7 +209,7 @@ where
use self::State::*;
loop {
match mem::replace(&mut self.state, NoCapacity) {
match mem::replace(&mut self.state, Invalid) {
Check(request) => {
// Poll predicate
match self.check.poll() {
@ -258,8 +252,8 @@ where
return ret;
}
NoCapacity => {
return Err(Error::NoCapacity);
Invalid => {
panic!("invalid state");
}
}
}

View File

@ -22,13 +22,12 @@ pub struct InFlightLimit<T> {
/// Error returned when the service has reached its limit.
#[derive(Debug)]
pub enum Error<T> {
NoCapacity,
Upstream(T),
}
#[derive(Debug)]
pub struct ResponseFuture<T> {
inner: Option<T>,
inner: T,
shared: Arc<Shared>,
}
@ -114,15 +113,12 @@ where
} else {
// Try to reserve
if !self.state.shared.reserve() {
return ResponseFuture {
inner: None,
shared: self.state.shared.clone(),
};
panic!("service not ready; call poll_ready first");
}
}
ResponseFuture {
inner: Some(self.inner.call(request)),
inner: self.inner.call(request),
shared: self.state.shared.clone(),
}
}
@ -140,35 +136,19 @@ where
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
use futures::Async::*;
let res = match self.inner {
Some(ref mut f) => match f.poll() {
Ok(Ready(v)) => {
self.shared.release();
Ok(Ready(v))
}
Ok(NotReady) => {
return Ok(NotReady);
}
Err(e) => {
self.shared.release();
Err(Error::Upstream(e))
}
},
None => Err(Error::NoCapacity),
};
// Drop the inner future
self.inner = None;
res
match self.inner.poll() {
Ok(Ready(v)) => Ok(Ready(v)),
Ok(NotReady) => {
return Ok(NotReady);
}
Err(e) => Err(Error::Upstream(e)),
}
}
}
impl<T> Drop for ResponseFuture<T> {
fn drop(&mut self) {
if self.inner.is_some() {
self.shared.release();
}
self.shared.release();
}
}
@ -238,7 +218,6 @@ where
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Upstream(ref why) => fmt::Display::fmt(why, f),
Error::NoCapacity => write!(f, "in-flight limit exceeded"),
}
}
}
@ -248,17 +227,14 @@ where
T: error::Error,
{
fn cause(&self) -> Option<&error::Error> {
if let Error::Upstream(ref why) = *self {
Some(why)
} else {
None
match *self {
Error::Upstream(ref why) => Some(why),
}
}
fn description(&self) -> &str {
match *self {
Error::Upstream(_) => "upstream service error",
Error::NoCapacity => "in-flight limit exceeded",
}
}
}

View File

@ -68,6 +68,7 @@ fn basic_service_limit_functionality_with_poll_ready() {
}
#[test]
#[should_panic]
fn basic_service_limit_functionality_without_poll_ready() {
let mut task = MockTask::new();
@ -112,6 +113,7 @@ fn basic_service_limit_functionality_without_poll_ready() {
}
#[test]
#[should_panic]
fn request_without_capacity() {
let mut task = MockTask::new();

View File

@ -52,7 +52,6 @@ pub struct ResponseFuture<T, E> {
#[derive(Debug, PartialEq)]
pub enum Error<T> {
Closed,
NoCapacity,
Other(T),
}
@ -148,9 +147,7 @@ impl<T, U, E> Service<T> for Mock<T, U, E> {
if !self.can_send {
if state.rem == 0 {
return ResponseFuture {
rx: Error::NoCapacity,
};
panic!("service not ready; poll_ready must be called first");
}
}
@ -206,7 +203,17 @@ impl<T, U, E> Clone for Mock<T, U, E> {
impl<T, U, E> Drop for Mock<T, U, E> {
fn drop(&mut self) {
let mut state = self.state.lock().unwrap();
let mut state = match self.state.lock() {
Ok(v) => v,
Err(e) => {
if ::std::thread::panicking() {
return;
}
panic!("{:?}", e);
}
};
state.tasks.remove(&self.id);
}
}
@ -252,7 +259,17 @@ impl<T, U, E> Handle<T, U, E> {
impl<T, U, E> Drop for Handle<T, U, E> {
fn drop(&mut self) {
let mut state = self.state.lock().unwrap();
let mut state = match self.state.lock() {
Ok(v) => v,
Err(e) => {
if ::std::thread::panicking() {
return;
}
panic!("{:?}", e);
}
};
state.is_closed = true;
for (_, task) in state.tasks.drain() {
@ -314,7 +331,6 @@ impl<T, E> Future for ResponseFuture<T, E> {
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err(Error::Closed),
},
Error::NoCapacity => Err(Error::NoCapacity),
Error::Closed => Err(Error::Closed),
}
}

View File

@ -35,6 +35,7 @@ fn single_request_ready() {
}
#[test]
#[should_panic]
fn backpressure() {
let (mut mock, mut handle) = new_mock();
@ -46,15 +47,7 @@ fn backpressure() {
});
// Try to send a request
let response = mock.call("hello?".into());
// Did not send
with_task(|| {
assert!(handle.poll_request().unwrap().is_not_ready());
});
// Response is an error
assert!(response.wait().is_err());
mock.call("hello?".into());
}
type Mock = tower_mock::Mock<String, String, ()>;

View File

@ -1,7 +1,5 @@
//! Tower middleware that applies a timeout to requests.
//!
//! If the response does not complete within the specified timeout, the response
//! will be aborted.
//! A Tower middleware that rate limits the requests that are passed to the
//! inner service.
#[macro_use]
extern crate futures;
@ -38,7 +36,7 @@ pub enum Error<T> {
}
pub struct ResponseFuture<T> {
inner: Option<T>,
inner: T,
}
#[derive(Debug)]
@ -145,10 +143,10 @@ where
}
// Call the inner future
let inner = Some(self.inner.call(request));
let inner = self.inner.call(request);
ResponseFuture { inner }
}
State::Limited(..) => ResponseFuture { inner: None },
State::Limited(..) => panic!("service not ready; poll_ready must be called first"),
}
}
}
@ -161,10 +159,7 @@ where
type Error = Error<T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner {
Some(ref mut f) => f.poll().map_err(Error::Upstream),
None => Err(Error::RateLimit),
}
self.inner.poll().map_err(Error::Upstream)
}
}

View File

@ -25,14 +25,11 @@ fn reaching_capacity() {
let response = rt.block_on(response);
assert_eq!(response.unwrap(), "world");
// Sending another request is rejected
let response = service.call("no");
let poll_request = rt.block_on(future::lazy(|| handle.poll_request()));
assert!(poll_request.unwrap().is_not_ready());
let response = rt.block_on(response);
assert!(response.is_err());
rt.block_on(future::lazy(|| {
assert!(service.poll_ready().unwrap().is_not_ready());
Ok::<_, ()>(())
}))
.unwrap();
let poll_request = rt.block_on(future::lazy(|| handle.poll_request()));
assert!(poll_request.unwrap().is_not_ready());

View File

@ -23,11 +23,10 @@ where
pub enum Error<T, U> {
Service(T),
Connect(U),
NotReady,
}
pub struct ResponseFuture<F, E> {
inner: Option<F>,
inner: F,
_connect_error_marker: PhantomData<fn() -> E>,
}
@ -135,11 +134,11 @@ where
fn call(&mut self, request: Request) -> Self::Future {
let service = match self.state {
State::Connected(ref mut service) => service,
_ => return ResponseFuture::new(None),
_ => panic!("service not ready; poll_ready must be called first"),
};
let fut = service.call(request);
ResponseFuture::new(Some(fut))
ResponseFuture::new(fut)
}
}
@ -162,7 +161,7 @@ where
// ===== impl ResponseFuture =====
impl<F, E> ResponseFuture<F, E> {
fn new(inner: Option<F>) -> Self {
fn new(inner: F) -> Self {
ResponseFuture {
inner,
_connect_error_marker: PhantomData,
@ -178,10 +177,7 @@ where
type Error = Error<F::Error, E>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner {
Some(ref mut f) => f.poll().map_err(Error::Service),
None => Err(Error::NotReady),
}
self.inner.poll().map_err(Error::Service)
}
}
@ -196,7 +192,6 @@ where
match *self {
Error::Service(ref why) => fmt::Display::fmt(why, f),
Error::Connect(ref why) => write!(f, "connection failed: {}", why),
Error::NotReady => f.pad("not ready"),
}
}
}
@ -210,7 +205,6 @@ where
match *self {
Error::Service(ref why) => Some(why),
Error::Connect(ref why) => Some(why),
Error::NotReady => None,
}
}
@ -218,7 +212,6 @@ where
match *self {
Error::Service(_) => "inner service error",
Error::Connect(_) => "connection failed",
Error::NotReady => "not ready",
}
}
}

View File

@ -171,20 +171,26 @@ pub trait Service<Request> {
/// is notified when the service becomes ready again. This function is
/// expected to be called while on a task.
///
/// This is a **best effort** implementation. False positives are permitted.
/// It is permitted for the service to return `Ready` from a `poll_ready`
/// call and the next invocation of `call` results in an error.
/// If `Err` is returned, the service is no longer able to service requests
/// and the caller should discard the service instance.
///
/// Once `poll_ready` returns `Ready`, a request may be dispatched to the
/// service using `call`. Until a request is dispatched, repeated calls to
/// `poll_ready` must return either `Ready` or `Err`.
fn poll_ready(&mut self) -> Poll<(), Self::Error>;
/// Process the request and return the response asynchronously.
///
/// This function is expected to be callable off task. As such,
/// implementations should take care to not call `poll_ready`. If the
/// service is at capacity and the request is unable to be handled, the
/// returned `Future` should resolve to an error.
/// implementations should take care to not call `poll_ready`.
///
/// Calling `call` without calling `poll_ready` is permitted. The
/// implementation must be resilient to this fact.
/// Before dispatching a request, `poll_ready` must be called and return
/// `Ready`.
///
/// # Panics
///
/// Implementations are permitted to panic if `call` is invoked without
/// obtaining `Ready` from `poll_ready`.
fn call(&mut self, req: Request) -> Self::Future;
}