Make Ready only take Service by reference (#340)

Rather than consuming `self` and returning `(Self, _)`. This did mean
that a few crates that depended on `Ready` to own the `Service` and
provide it once it was ready had to change to call `poll_ready`
directly. Which in turn meant adding in some PhantomData<Request> so
that the impl blocks wouldn't be under-constrainted. Take, for example:

```
impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S>
```

would fail to compile with

```
error[E0207]: the type parameter `Req` is not constrained by the impl trait, self type, or predicates
```
This commit is contained in:
Jon Gjengset 2019-09-11 15:49:51 -04:00 committed by GitHub
parent 3d642f5ca0
commit 395889c763
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 44 additions and 42 deletions

View File

@ -40,7 +40,6 @@ tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" }
tower-load = { version = "=0.3.0-alpha.1", path = "../tower-load" }
tower-service = "=0.3.0-alpha.1"
tower-make = { version = "=0.3.0-alpha.1", path = "../tower-make" }
tower-util = { version = "=0.3.0-alpha.1", path = "../tower-util" }
slab = "0.4"
[dev-dependencies]

View File

@ -4,6 +4,7 @@ use futures_util::{stream, try_future, try_future::TryFutureExt};
use indexmap::IndexMap;
use pin_project::pin_project;
use rand::{rngs::SmallRng, FromEntropy};
use std::marker::PhantomData;
use std::{
future::Future,
pin::Pin,
@ -13,7 +14,6 @@ use tokio_sync::oneshot;
use tower_discover::{Change, Discover};
use tower_load::Load;
use tower_service::Service;
use tower_util::Ready;
use tracing::{debug, trace};
/// Distributes requests across inner services using the [Power of Two Choices][p2c].
@ -50,6 +50,8 @@ pub struct Balance<D: Discover, Req> {
next_ready_index: Option<usize>,
rng: SmallRng,
_req: PhantomData<Req>,
}
#[pin_project]
@ -61,8 +63,9 @@ struct UnreadyService<K, S, Req> {
key: Option<K>,
#[pin]
cancel: oneshot::Receiver<()>,
#[pin]
ready: tower_util::Ready<S, Req>,
service: Option<S>,
_req: PhantomData<Req>,
}
enum Error<E> {
@ -84,6 +87,8 @@ where
cancelations: IndexMap::default(),
unready_services: stream::FuturesUnordered::new(),
next_ready_index: None,
_req: PhantomData,
}
}
@ -139,8 +144,9 @@ where
self.cancelations.insert(key.clone(), tx);
self.unready_services.push(UnreadyService {
key: Some(key),
ready: Ready::new(svc),
service: Some(svc),
cancel: rx,
_req: PhantomData,
});
}
@ -342,15 +348,18 @@ impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
return Poll::Ready(Err((key, Error::Canceled)));
}
match ready!(this.ready.poll(cx)) {
Ok(svc) => {
let key = this.key.take().expect("polled after ready");
Poll::Ready(Ok((key, svc)))
}
Err(e) => {
let key = this.key.take().expect("polled after ready");
Poll::Ready(Err((key, Error::Inner(e))))
}
let res = ready!(this
.service
.as_mut()
.expect("poll after ready")
.poll_ready(cx));
let key = this.key.take().expect("polled after ready");
let svc = this.service.take().expect("polled after ready");
match res {
Ok(()) => Poll::Ready(Ok((key, svc))),
Err(e) => Poll::Ready(Err((key, Error::Inner(e)))),
}
}
}

View File

@ -28,7 +28,6 @@ futures-util-preview = "=0.3.0-alpha.18"
pin-project = "=0.4.0-alpha.11"
tower-service = "=0.3.0-alpha.1"
tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" }
tower-util = { version = "=0.3.0-alpha.1", path = "../tower-util" }
tokio-executor = "=0.2.0-alpha.4"
tokio-sync = "=0.2.0-alpha.4"

View File

@ -3,6 +3,7 @@
use crate::error::Error;
use futures_core::ready;
use pin_project::pin_project;
use std::marker::PhantomData;
use std::{
future::Future,
pin::Pin,
@ -11,18 +12,13 @@ use std::{
use tokio_executor::TypedExecutor;
use tokio_sync::oneshot;
use tower_service::Service;
use tower_util::Ready;
#[pin_project]
/// Drives a service to readiness.
pub struct BackgroundReady<T, Request>
where
T: Service<Request>,
T::Error: Into<Error>,
{
#[pin]
ready: Ready<T, Request>,
pub struct BackgroundReady<T, Request> {
service: Option<T>,
tx: Option<oneshot::Sender<Result<T, Error>>>,
_req: PhantomData<Request>,
}
/// This trait allows you to use either Tokio's threaded runtime's executor or
@ -55,8 +51,9 @@ where
{
let (tx, rx) = oneshot::channel();
let bg = BackgroundReady {
ready: Ready::new(service),
service: Some(service),
tx: Some(tx),
_req: PhantomData,
};
(bg, rx)
}
@ -75,7 +72,9 @@ where
return Poll::Ready(());
}
let result = ready!(this.ready.poll(cx));
let result = ready!(this.service.as_mut().expect("illegal state").poll_ready(cx))
.map(|()| this.service.take().expect("illegal state"));
let _ = this
.tx
.take()

View File

@ -13,42 +13,38 @@ use tower_service::Service;
///
/// `Ready` values are produced by `ServiceExt::ready`.
#[pin_project]
pub struct Ready<T, Request> {
inner: Option<T>,
pub struct Ready<'a, T, Request> {
inner: &'a mut T,
_p: PhantomData<fn() -> Request>,
}
impl<T, Request> Ready<T, Request>
impl<'a, T, Request> Ready<'a, T, Request>
where
T: Service<Request>,
{
pub fn new(service: T) -> Self {
pub fn new(service: &'a mut T) -> Self {
Ready {
inner: Some(service),
inner: service,
_p: PhantomData,
}
}
}
impl<T, Request> Future for Ready<T, Request>
impl<'a, T, Request> Future for Ready<'a, T, Request>
where
T: Service<Request>,
{
type Output = Result<T, T::Error>;
type Output = Result<(), T::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
ready!(this
.inner
.as_mut()
.expect("called `poll` after future completed")
.poll_ready(cx))?;
ready!(this.inner.poll_ready(cx))?;
Poll::Ready(Ok(this.inner.take().unwrap()))
Poll::Ready(Ok(()))
}
}
impl<T, Request> fmt::Debug for Ready<T, Request>
impl<'a, T, Request> fmt::Debug for Ready<'a, T, Request>
where
T: fmt::Debug,
{

View File

@ -14,7 +14,7 @@ type Error = Box<dyn std::error::Error + Send + Sync>;
/// adapters
pub trait ServiceExt<Request>: Service<Request> {
/// A future yielding the service when it is ready to accept a request.
fn ready(self) -> Ready<Self, Request>
fn ready(&mut self) -> Ready<'_, Self, Request>
where
Self: Sized,
{

View File

@ -17,7 +17,7 @@ async fn builder_service() {
pin_mut!(handle);
let policy = MockPolicy;
let client = ServiceBuilder::new()
let mut client = ServiceBuilder::new()
.layer(BufferLayer::new(5))
.layer(ConcurrencyLimitLayer::new(5))
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
@ -28,7 +28,7 @@ async fn builder_service() {
// allow a request through
handle.allow(1);
let mut client = client.ready().await.unwrap();
client.ready().await.unwrap();
let fut = client.call("hello");
let (request, rsp) = poll_fn(|cx| handle.as_mut().poll_request(cx))
.await