From 0802ca2bce9c51c0a20d10532d1804074e362dc9 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Tue, 10 Sep 2019 14:51:07 -0400 Subject: [PATCH] Update tower-util and tower to std::future (#330) This bumps tower-util and tower to 0.3.0-alpha.1 --- Cargo.toml | 2 +- tower-test/Cargo.toml | 2 +- tower-test/src/mock/future.rs | 18 ++-- tower-util/CHANGELOG.md | 4 + tower-util/Cargo.toml | 16 ++-- tower-util/src/boxed/mod.rs | 8 +- tower-util/src/boxed/sync.rs | 20 +++-- tower-util/src/boxed/unsync.rs | 20 +++-- tower-util/src/call_all/common.rs | 90 ++++++++++++------- tower-util/src/call_all/ordered.rs | 123 ++++++++++++++------------ tower-util/src/call_all/unordered.rs | 45 +++++++--- tower-util/src/either.rs | 48 +++++----- tower-util/src/lib.rs | 54 +++++++----- tower-util/src/oneshot.rs | 61 ++++++------- tower-util/src/optional/future.rs | 27 +++--- tower-util/src/optional/mod.rs | 13 +-- tower-util/src/ready.rs | 28 +++--- tower-util/tests/call_all.rs | 127 ++++++++++++++------------- tower-util/tests/service_fn.rs | 12 ++- tower/CHANGELOG.md | 4 + tower/Cargo.toml | 33 +++---- tower/src/builder/mod.rs | 63 +++---------- tower/src/lib.rs | 4 +- tower/src/util.rs | 3 +- tower/tests/builder.rs | 61 +++++-------- 25 files changed, 466 insertions(+), 420 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 02a15e1..8dbaf5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] members = [ - # "tower", + "tower", # "tower-balance", "tower-buffer", "tower-discover", diff --git a/tower-test/Cargo.toml b/tower-test/Cargo.toml index cec974d..eee52ff 100644 --- a/tower-test/Cargo.toml +++ b/tower-test/Cargo.toml @@ -27,4 +27,4 @@ futures-executor-preview = "0.3.0-alpha.18" tokio-test = "0.2.0-alpha.2" tokio-sync = "0.2.0-alpha.2" tower-service = "0.3.0-alpha.1" -pin-utils = "0.1.0-alpha.4" \ No newline at end of file +pin-project = "0.4.0-alpha.10" diff --git a/tower-test/src/mock/future.rs b/tower-test/src/mock/future.rs index c01754a..4517f63 100644 --- a/tower-test/src/mock/future.rs +++ b/tower-test/src/mock/future.rs @@ -1,6 +1,8 @@ //! Future types use crate::mock::error::{self, Error}; +use futures_util::ready; +use pin_project::pin_project; use tokio_sync::oneshot; use std::{ @@ -10,16 +12,16 @@ use std::{ }; /// Future of the `Mock` response. +#[pin_project] #[derive(Debug)] pub struct ResponseFuture { + #[pin] rx: Option>, } type Rx = oneshot::Receiver>; impl ResponseFuture { - pin_utils::unsafe_pinned!(rx: Option>); - pub(crate) fn new(rx: Rx) -> ResponseFuture { ResponseFuture { rx: Some(rx) } } @@ -32,13 +34,11 @@ impl ResponseFuture { impl Future for ResponseFuture { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - match self.rx().as_pin_mut() { - Some(rx) => match rx.poll(cx) { - Poll::Ready(Ok(Ok(v))) => Poll::Ready(Ok(v)), - Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e)), - Poll::Ready(Err(_)) => Poll::Ready(Err(error::Closed::new().into())), - Poll::Pending => Poll::Pending, + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.project().rx.as_pin_mut() { + Some(rx) => match ready!(rx.poll(cx)) { + Ok(r) => Poll::Ready(r), + Err(_) => Poll::Ready(Err(error::Closed::new().into())), }, None => Poll::Ready(Err(error::Closed::new().into())), } diff --git a/tower-util/CHANGELOG.md b/tower-util/CHANGELOG.md index 24a6a34..0730e7b 100644 --- a/tower-util/CHANGELOG.md +++ b/tower-util/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.3.0-alpha.1 + +- Move to `std::future` + # 0.1.0 (April 26, 2019) - Initial release diff --git a/tower-util/Cargo.toml b/tower-util/Cargo.toml index 981c603..f3b36ca 100644 --- a/tower-util/Cargo.toml +++ b/tower-util/Cargo.toml @@ -15,7 +15,7 @@ license = "MIT" readme = "README.md" repository = "https://github.com/tower-rs/tower" homepage = "https://github.com/tower-rs/tower" -documentation = "https://docs.rs/tower-util/0.1.0" +documentation = "https://docs.rs/tower-util/0.3.0-alpha.1" description = """ Utilities for working with `Service`. """ @@ -24,10 +24,14 @@ edition = "2018" [dependencies] tower-service = "=0.3.0-alpha.1" -# tower-layer = "0.1.0" +tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" } +pin-project = { version = "0.4.0-alpha.10", features = ["project_attr"] } +futures-util-preview = "0.3.0-alpha.18" +futures-core-preview = "0.3.0-alpha.18" [dev-dependencies] -futures = { version = "=0.3.0-alpha.18", package = "futures-preview"} -# tokio-mock-task = "0.1.1" -# tower = { version = "0.1.0", path = "../tower" } -# tower-test = { version = "0.1.0", path = "../tower-test" } +futures-util-preview = "0.3.0-alpha.18" +tokio-test = "0.2.0-alpha.1" +tokio = "0.2.0-alpha.1" +tower = { version = "0.3.0-alpha.1", path = "../tower" } +tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" } diff --git a/tower-util/src/boxed/mod.rs b/tower-util/src/boxed/mod.rs index 8d46788..9a55ab7 100644 --- a/tower-util/src/boxed/mod.rs +++ b/tower-util/src/boxed/mod.rs @@ -14,18 +14,14 @@ //! # Examples //! //! ``` -//! # extern crate futures; -//! # extern crate tower_service; -//! # extern crate tower_util; -//! # use futures::*; -//! # use futures::future::FutureResult; +//! use futures_util::future::ready; //! # use tower_service::Service; //! # use tower_util::{BoxService, service_fn}; //! // Respond to requests using a closure, but closures cannot be named... //! # pub fn main() { //! let svc = service_fn(|mut request: String| { //! request.push_str(" response"); -//! Ok(request) +//! ready(Ok(request)) //! }); //! //! let service: BoxService = BoxService::new(svc); diff --git a/tower-util/src/boxed/sync.rs b/tower-util/src/boxed/sync.rs index f5d38db..e7926ec 100644 --- a/tower-util/src/boxed/sync.rs +++ b/tower-util/src/boxed/sync.rs @@ -1,7 +1,11 @@ -use futures::{Future, Poll}; use tower_service::Service; use std::fmt; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; /// A boxed `Service + Send` trait object. /// @@ -18,7 +22,7 @@ pub struct BoxService { /// /// This type alias represents a boxed future that is `Send` and can be moved /// across threads. -type BoxFuture = Box + Send>; +type BoxFuture = Pin> + Send>>; #[derive(Debug)] struct Boxed { @@ -41,8 +45,8 @@ impl Service for BoxService { type Error = E; type Future = BoxFuture; - fn poll_ready(&mut self) -> Poll<(), E> { - self.inner.poll_ready() + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) } fn call(&mut self, request: T) -> BoxFuture { @@ -68,13 +72,13 @@ where { type Response = S::Response; type Error = S::Error; - type Future = Box + Send>; + type Future = Pin> + Send>>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready() + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) } fn call(&mut self, request: Request) -> Self::Future { - Box::new(self.inner.call(request)) + Box::pin(self.inner.call(request)) } } diff --git a/tower-util/src/boxed/unsync.rs b/tower-util/src/boxed/unsync.rs index c82990a..ff8f3f6 100644 --- a/tower-util/src/boxed/unsync.rs +++ b/tower-util/src/boxed/unsync.rs @@ -1,7 +1,11 @@ -use futures::{Future, Poll}; use tower_service::Service; use std::fmt; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; /// A boxed `Service` trait object. pub struct UnsyncBoxService { @@ -12,7 +16,7 @@ pub struct UnsyncBoxService { /// /// This type alias represents a boxed future that is *not* `Send` and must /// remain on the current thread. -type UnsyncBoxFuture = Box>; +type UnsyncBoxFuture = Pin>>>; #[derive(Debug)] struct UnsyncBoxed { @@ -35,8 +39,8 @@ impl Service for UnsyncBoxService { type Error = E; type Future = UnsyncBoxFuture; - fn poll_ready(&mut self) -> Poll<(), E> { - self.inner.poll_ready() + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) } fn call(&mut self, request: T) -> UnsyncBoxFuture { @@ -62,13 +66,13 @@ where { type Response = S::Response; type Error = S::Error; - type Future = Box>; + type Future = Pin>>>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready() + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) } fn call(&mut self, request: Request) -> Self::Future { - Box::new(self.inner.call(request)) + Box::pin(self.inner.call(request)) } } diff --git a/tower-util/src/call_all/common.rs b/tower-util/src/call_all/common.rs index a9cde5c..8f1e331 100644 --- a/tower-util/src/call_all/common.rs +++ b/tower-util/src/call_all/common.rs @@ -1,22 +1,31 @@ use super::Error; -use futures::{try_ready, Async, Future, Poll, Stream}; +use futures_core::{ready, Stream}; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use tower_service::Service; /// TODO: Dox +#[pin_project] #[derive(Debug)] pub(crate) struct CallAll { - service: Svc, + service: Option, + #[pin] stream: S, queue: Q, eof: bool, } -pub(crate) trait Drive { +pub(crate) trait Drive { fn is_empty(&self) -> bool; - fn push(&mut self, future: T); + fn push(&mut self, future: F); - fn poll(&mut self) -> Poll, T::Error>; + // NOTE: this implicitly requires Self: Unpin just like Service does + fn poll(&mut self, cx: &mut Context<'_>) -> Poll>; } impl CallAll @@ -24,12 +33,11 @@ where Svc: Service, Svc::Error: Into, S: Stream, - S::Error: Into, Q: Drive, { pub(crate) fn new(service: Svc, stream: S, queue: Q) -> CallAll { CallAll { - service, + service: Some(service), stream, queue, eof: false, @@ -37,59 +45,73 @@ where } /// Extract the wrapped `Service`. - pub(crate) fn into_inner(self) -> Svc { - self.service + pub(crate) fn into_inner(mut self) -> Svc { + self.service.take().expect("Service already taken") } - pub(crate) fn unordered(self) -> super::CallAllUnordered { + /// Extract the wrapped `Service`. + pub(crate) fn take_service(mut self: Pin<&mut Self>) -> Svc { + self.project() + .service + .take() + .expect("Service already taken") + } + + pub(crate) fn unordered(mut self) -> super::CallAllUnordered { assert!(self.queue.is_empty() && !self.eof); - super::CallAllUnordered::new(self.service, self.stream) + super::CallAllUnordered::new(self.service.take().unwrap(), self.stream) } } impl Stream for CallAll where Svc: Service, - Svc::Error: Into, + Error: From, S: Stream, - S::Error: Into, Q: Drive, { - type Item = Svc::Response; - type Error = Error; + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); - fn poll(&mut self) -> Poll, Self::Error> { loop { - let res = self.queue.poll().map_err(Into::into); - // First, see if we have any responses to yield - if let Async::Ready(Some(rsp)) = res? { - return Ok(Async::Ready(Some(rsp))); + if let Poll::Ready(r) = this.queue.poll(cx) { + if let Some(rsp) = r.transpose()? { + return Poll::Ready(Some(Ok(rsp))); + } } // If there are no more requests coming, check if we're done - if self.eof { - if self.queue.is_empty() { - return Ok(Async::Ready(None)); + if *this.eof { + if this.queue.is_empty() { + return Poll::Ready(None); } else { - return Ok(Async::NotReady); + return Poll::Pending; } } // Then, see that the service is ready for another request - try_ready!(self.service.poll_ready().map_err(Into::into)); + let svc = this + .service + .as_mut() + .expect("Using CallAll after extracing inner Service"); + ready!(svc.poll_ready(cx))?; // If it is, gather the next request (if there is one) - match self.stream.poll().map_err(Into::into)? { - Async::Ready(Some(req)) => { - self.queue.push(self.service.call(req)); - } - Async::Ready(None) => { - // We're all done once any outstanding requests have completed - self.eof = true; - } - Async::NotReady => { + match this.stream.as_mut().poll_next(cx) { + Poll::Ready(r) => match r { + Some(req) => { + this.queue.push(svc.call(req)); + } + None => { + // We're all done once any outstanding requests have completed + *this.eof = true; + } + }, + Poll::Pending => { // TODO: We probably want to "release" the slot we reserved in Svc here. // It may be a while until we get around to actually using it. } diff --git a/tower-util/src/call_all/ordered.rs b/tower-util/src/call_all/ordered.rs index a6f69c9..87b7e84 100644 --- a/tower-util/src/call_all/ordered.rs +++ b/tower-util/src/call_all/ordered.rs @@ -1,87 +1,88 @@ //! `Stream` + `Service` => `Stream`. use super::{common, Error}; -use futures::{stream::FuturesOrdered, Future, Poll, Stream}; +use futures_core::Stream; +use futures_util::stream::FuturesOrdered; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use tower_service::Service; /// This is a `futures::Stream` of responses resulting from calling the wrapped `tower::Service` /// for each request received on the wrapped `Stream`. /// /// ```rust -/// # extern crate futures; -/// # extern crate tower_service; -/// # extern crate tokio_mock_task; -/// # extern crate tower; -/// # use futures::future::{ok, FutureResult}; -/// # use futures::{Async, Poll}; +/// # use std::task::{Poll, Context}; /// # use std::cell::Cell; /// # use std::error::Error; /// # use std::rc::Rc; /// # -/// use futures::Stream; +/// use futures_util::future::{ready, Ready}; /// use tower_service::Service; /// use tower::ServiceExt; +/// use tokio::prelude::*; /// /// // First, we need to have a Service to process our requests. /// #[derive(Debug, Eq, PartialEq)] /// struct FirstLetter; /// impl Service<&'static str> for FirstLetter { /// type Response = &'static str; -/// type Error = Box; -/// type Future = FutureResult; +/// type Error = Box; +/// type Future = Ready>; /// -/// fn poll_ready(&mut self) -> Poll<(), Self::Error> { -/// Ok(Async::Ready(())) +/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { +/// Poll::Ready(Ok(())) /// } /// /// fn call(&mut self, req: &'static str) -> Self::Future { -/// ok(&req[..1]) +/// ready(Ok(&req[..1])) /// } /// } /// -/// # fn main() { -/// # let mut mock = tokio_mock_task::MockTask::new(); -/// // Next, we need a Stream of requests. -/// let (reqs, rx) = futures::unsync::mpsc::unbounded(); -/// // Note that we have to help Rust out here by telling it what error type to use. -/// // Specifically, it has to be From + From. -/// let rsps = FirstLetter.call_all(rx.map_err(|_| "boom")); +/// #[tokio::main] +/// async fn main() { +/// // Next, we need a Stream of requests. +/// let (mut reqs, rx) = tokio::sync::mpsc::unbounded_channel(); +/// // Note that we have to help Rust out here by telling it what error type to use. +/// // Specifically, it has to be From + From. +/// let mut rsps = FirstLetter.call_all(rx); /// -/// // Now, let's send a few requests and then check that we get the corresponding responses. -/// reqs.unbounded_send("one"); -/// reqs.unbounded_send("two"); -/// reqs.unbounded_send("three"); -/// drop(reqs); +/// // Now, let's send a few requests and then check that we get the corresponding responses. +/// reqs.try_send("one"); +/// reqs.try_send("two"); +/// reqs.try_send("three"); +/// drop(reqs); /// -/// // We then loop over the response Strem that we get back from call_all. -/// # // a little bit of trickery here since we don't have an executor -/// # /* -/// let mut iter = rsps.wait(); -/// # */ -/// # let mut iter = mock.enter(|| rsps.wait()); -/// # for (i, rsp) in (&mut iter).enumerate() { -/// // Since we used .wait(), each response is a Result. -/// match (i + 1, rsp.unwrap()) { -/// (1, "o") | -/// (2, "t") | -/// (3, "t") => {} -/// (n, i) => { -/// unreachable!("{}. response was '{}'", n, i); +/// // We then loop over the response Strem that we get back from call_all. +/// let mut i = 0usize; +/// while let Some(rsp) = rsps.next().await { +/// // Each response is a Result (we could also have used TryStream::try_next) +/// match (i + 1, rsp.unwrap()) { +/// (1, "o") | +/// (2, "t") | +/// (3, "t") => {} +/// (n, i) => { +/// unreachable!("{}. response was '{}'", n, i); +/// } /// } +/// i += 1; /// } -/// } /// -/// // And at the end, we can get the Service back when there are no more requests. -/// let rsps = iter.into_inner(); -/// assert_eq!(rsps.into_inner(), FirstLetter); -/// # } +/// // And at the end, we can get the Service back when there are no more requests. +/// assert_eq!(rsps.into_inner(), FirstLetter); +/// } /// ``` +#[pin_project] #[derive(Debug)] pub struct CallAll where Svc: Service, S: Stream, { + #[pin] inner: common::CallAll>, } @@ -90,7 +91,6 @@ where Svc: Service, Svc::Error: Into, S: Stream, - S::Error: Into, { /// Create new `CallAll` combinator. /// @@ -103,10 +103,25 @@ where } /// Extract the wrapped `Service`. + /// + /// # Panics + /// + /// Panics if `take_service` was already called. pub fn into_inner(self) -> Svc { self.inner.into_inner() } + /// Extract the wrapped `Service`. + /// + /// This `CallAll` can no longer be used after this function has been called. + /// + /// # Panics + /// + /// Panics if `take_service` was already called. + pub fn take_service(mut self: Pin<&mut Self>) -> Svc { + self.project().inner.take_service() + } + /// Return responses as they are ready, regardless of the initial order. /// /// This function must be called before the stream is polled. @@ -122,28 +137,26 @@ where impl Stream for CallAll where Svc: Service, - Svc::Error: Into, + Error: From, S: Stream, - S::Error: Into, { - type Item = Svc::Response; - type Error = Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_next(cx) } } -impl common::Drive for FuturesOrdered { +impl common::Drive for FuturesOrdered { fn is_empty(&self) -> bool { FuturesOrdered::is_empty(self) } - fn push(&mut self, future: T) { + fn push(&mut self, future: F) { FuturesOrdered::push(self, future) } - fn poll(&mut self) -> Poll, T::Error> { - Stream::poll(self) + fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { + Stream::poll_next(Pin::new(self), cx) } } diff --git a/tower-util/src/call_all/unordered.rs b/tower-util/src/call_all/unordered.rs index 9469dba..4b7be7e 100644 --- a/tower-util/src/call_all/unordered.rs +++ b/tower-util/src/call_all/unordered.rs @@ -1,19 +1,28 @@ //! `Stream` + `Service` => `Stream`. use super::{common, Error}; -use futures::{stream::FuturesUnordered, Future, Poll, Stream}; +use futures_core::Stream; +use futures_util::stream::FuturesUnordered; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use tower_service::Service; /// A stream of responses received from the inner service in received order. /// /// Similar to `CallAll` except, instead of yielding responses in request order, /// responses are returned as they are available. +#[pin_project] #[derive(Debug)] pub struct CallAllUnordered where Svc: Service, S: Stream, { + #[pin] inner: common::CallAll>, } @@ -22,7 +31,6 @@ where Svc: Service, Svc::Error: Into, S: Stream, - S::Error: Into, { /// Create new `CallAllUnordered` combinator. /// @@ -36,36 +44,49 @@ where } /// Extract the wrapped `Service`. + /// + /// # Panics + /// + /// Panics if `take_service` was already called. pub fn into_inner(self) -> Svc { self.inner.into_inner() } + + /// Extract the wrapped `Service`. + /// + /// This `CallAll` can no longer be used after this function has been called. + /// + /// # Panics + /// + /// Panics if `take_service` was already called. + pub fn take_service(mut self: Pin<&mut Self>) -> Svc { + self.project().inner.take_service() + } } impl Stream for CallAllUnordered where Svc: Service, - Svc::Error: Into, + Error: From, S: Stream, - S::Error: Into, { - type Item = Svc::Response; - type Error = Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_next(cx) } } -impl common::Drive for FuturesUnordered { +impl common::Drive for FuturesUnordered { fn is_empty(&self) -> bool { FuturesUnordered::is_empty(self) } - fn push(&mut self, future: T) { + fn push(&mut self, future: F) { FuturesUnordered::push(self, future) } - fn poll(&mut self) -> Poll, T::Error> { - Stream::poll(self) + fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { + Stream::poll_next(Pin::new(self), cx) } } diff --git a/tower-util/src/either.rs b/tower-util/src/either.rs index 14e76ef..dcd3a5c 100644 --- a/tower-util/src/either.rs +++ b/tower-util/src/either.rs @@ -2,7 +2,13 @@ //! //! See `Either` documentation for more details. -use futures::{Future, Poll}; +use futures_util::ready; +use pin_project::{pin_project, project}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use tower_service::Service; /// Combine two different service types into a single type. @@ -10,10 +16,11 @@ use tower_service::Service; /// Both services must be of the same request, response, and error types. /// `Either` is useful for handling conditional branching in service middleware /// to different inner service types. +#[pin_project] #[derive(Clone, Debug)] pub enum Either { - A(A), - B(B), + A(#[pin] A), + B(#[pin] B), } type Error = Box; @@ -21,20 +28,20 @@ type Error = Box; impl Service for Either where A: Service, - A::Error: Into, + Error: From, B: Service, - B::Error: Into, + Error: From, { type Response = A::Response; type Error = Error; type Future = Either; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { use self::Either::*; match self { - A(service) => service.poll_ready().map_err(Into::into), - B(service) => service.poll_ready().map_err(Into::into), + A(service) => Poll::Ready(Ok(ready!(service.poll_ready(cx))?)), + B(service) => Poll::Ready(Ok(ready!(service.poll_ready(cx))?)), } } @@ -48,22 +55,21 @@ where } } -impl Future for Either +impl Future for Either where - A: Future, - A::Error: Into, - B: Future, - B::Error: Into, + A: Future>, + Error: From, + B: Future>, + Error: From, { - type Item = A::Item; - type Error = Error; + type Output = Result; - fn poll(&mut self) -> Poll { - use self::Either::*; - - match self { - A(fut) => fut.poll().map_err(Into::into), - B(fut) => fut.poll().map_err(Into::into), + #[project] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + #[project] + match self.project() { + Either::A(fut) => Poll::Ready(Ok(ready!(fut.poll(cx))?)), + Either::B(fut) => Poll::Ready(Ok(ready!(fut.poll(cx))?)), } } } diff --git a/tower-util/src/lib.rs b/tower-util/src/lib.rs index 4421bde..5c58991 100644 --- a/tower-util/src/lib.rs +++ b/tower-util/src/lib.rs @@ -1,31 +1,39 @@ -#![doc(html_root_url = "https://docs.rs/tower-util/0.1.0")] +#![doc(html_root_url = "https://docs.rs/tower-util/0.3.0-alpha.1")] #![deny(rust_2018_idioms)] #![allow(elided_lifetimes_in_paths)] //! Various utility types and functions that are generally with Tower. -//mod boxed; -//mod call_all; -//mod either; -//pub mod layer; -//mod oneshot; -//mod optional; -//mod ready; -//mod sealed; +mod boxed; +mod call_all; +mod either; +pub mod layer; +mod oneshot; +mod optional; +mod ready; +mod sealed; mod service_fn; -pub use crate::service_fn::{service_fn, ServiceFn}; +pub use crate::{ + boxed::{BoxService, UnsyncBoxService}, + call_all::{CallAll, CallAllUnordered}, + either::Either, + oneshot::Oneshot, + optional::Optional, + ready::Ready, + service_fn::{service_fn, ServiceFn}, +}; -//pub mod error { -// //! Error types -// -// pub use crate::optional::error as optional; -//} -// -//pub mod future { -// //! Future types -// -// #[cfg(feature = "either")] -// pub use crate::either::future as either; -// pub use crate::optional::future as optional; -//} +pub mod error { + //! Error types + + pub use crate::optional::error as optional; +} + +pub mod future { + //! Future types + + #[cfg(feature = "either")] + pub use crate::either::future as either; + pub use crate::optional::future as optional; +} diff --git a/tower-util/src/oneshot.rs b/tower-util/src/oneshot.rs index 9c53e54..d7d05f6 100644 --- a/tower-util/src/oneshot.rs +++ b/tower-util/src/oneshot.rs @@ -1,19 +1,26 @@ -use std::mem; - -use futures::{Async, Future, Poll}; +use futures_util::ready; +use pin_project::{pin_project, project}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use tower_service::Service; /// A `Future` consuming a `Service` and request, waiting until the `Service` /// is ready, and then calling `Service::call` with the request, and /// waiting for that `Future`. +#[pin_project] pub struct Oneshot, Req> { + #[pin] state: State, } +#[pin_project] enum State, Req> { - NotReady(S, Req), - Called(S::Future), - Tmp, + NotReady(Option<(S, Req)>), + Called(#[pin] S::Future), + Done, } impl Oneshot @@ -22,7 +29,7 @@ where { pub fn new(svc: S, req: Req) -> Self { Oneshot { - state: State::NotReady(svc, req), + state: State::NotReady(Some((svc, req))), } } } @@ -31,31 +38,25 @@ impl Future for Oneshot where S: Service, { - type Item = S::Response; - type Error = S::Error; + type Output = Result; - fn poll(&mut self) -> Poll { + #[project] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); loop { - match mem::replace(&mut self.state, State::Tmp) { - State::NotReady(mut svc, req) => match svc.poll_ready()? { - Async::Ready(()) => { - self.state = State::Called(svc.call(req)); - } - Async::NotReady => { - self.state = State::NotReady(svc, req); - return Ok(Async::NotReady); - } - }, - State::Called(mut fut) => match fut.poll()? { - Async::Ready(res) => { - return Ok(Async::Ready(res)); - } - Async::NotReady => { - self.state = State::Called(fut); - return Ok(Async::NotReady); - } - }, - State::Tmp => panic!("polled after complete"), + #[project] + match this.state.project() { + State::NotReady(nr) => { + let (mut svc, req) = nr.take().expect("We immediately transition to ::Called"); + let _ = ready!(svc.poll_ready(cx))?; + this.state.set(State::Called(svc.call(req))); + } + State::Called(fut) => { + let res = ready!(fut.poll(cx))?; + this.state.set(State::Done); + return Poll::Ready(Ok(res)); + } + State::Done => panic!("polled after complete"), } } } diff --git a/tower-util/src/optional/future.rs b/tower-util/src/optional/future.rs index ea48940..25d5f45 100644 --- a/tower-util/src/optional/future.rs +++ b/tower-util/src/optional/future.rs @@ -1,8 +1,16 @@ use super::{error, Error}; -use futures::{Future, Poll}; +use futures_util::ready; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; /// Response future returned by `Optional`. +#[pin_project] pub struct ResponseFuture { + #[pin] inner: Option, } @@ -12,18 +20,17 @@ impl ResponseFuture { } } -impl Future for ResponseFuture +impl Future for ResponseFuture where - T: Future, - T::Error: Into, + F: Future>, + Error: From, { - type Item = T::Item; - type Error = Error; + type Output = Result; - fn poll(&mut self) -> Poll { - match self.inner { - Some(ref mut inner) => inner.poll().map_err(Into::into), - None => Err(error::None::new().into()), + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project().inner.as_pin_mut() { + Some(inner) => Poll::Ready(Ok(ready!(inner.poll(cx))?)), + None => Poll::Ready(Err(error::None::new().into())), } } } diff --git a/tower-util/src/optional/mod.rs b/tower-util/src/optional/mod.rs index 90c29bd..d34b3b1 100644 --- a/tower-util/src/optional/mod.rs +++ b/tower-util/src/optional/mod.rs @@ -7,7 +7,7 @@ pub mod error; pub mod future; use self::{error::Error, future::ResponseFuture}; -use futures::Poll; +use std::task::{Context, Poll}; use tower_service::Service; /// Optionally forwards requests to an inner service. @@ -31,17 +31,20 @@ impl Optional { impl Service for Optional where T: Service, - T::Error: Into, + Error: From, { type Response = T::Response; type Error = Error; type Future = ResponseFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self.inner { - Some(ref mut inner) => inner.poll_ready().map_err(Into::into), + Some(ref mut inner) => match inner.poll_ready(cx) { + Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)), + Poll::Pending => Poll::Pending, + }, // None services are always ready - None => Ok(().into()), + None => Poll::Ready(Ok(())), } } diff --git a/tower-util/src/ready.rs b/tower-util/src/ready.rs index 4d1d03d..3ce57be 100644 --- a/tower-util/src/ready.rs +++ b/tower-util/src/ready.rs @@ -1,11 +1,18 @@ use std::{fmt, marker::PhantomData}; -use futures::{try_ready, Future, Poll}; +use futures_util::ready; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use tower_service::Service; /// Future yielding a `Service` once the service is ready to process a request /// /// `Ready` values are produced by `ServiceExt::ready`. +#[pin_project] pub struct Ready { inner: Option, _p: PhantomData Request>, @@ -27,18 +34,17 @@ impl Future for Ready where T: Service, { - type Item = T; - type Error = T::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - match self.inner { - Some(ref mut service) => { - let _ = try_ready!(service.poll_ready()); - } - None => panic!("called `poll` after future completed"), - } + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + ready!(this + .inner + .as_mut() + .expect("called `poll` after future completed") + .poll_ready(cx))?; - Ok(self.inner.take().unwrap().into()) + Poll::Ready(Ok(this.inner.take().unwrap())) } } diff --git a/tower-util/tests/call_all.rs b/tower-util/tests/call_all.rs index d79be6d..c4c5818 100644 --- a/tower-util/tests/call_all.rs +++ b/tower-util/tests/call_all.rs @@ -1,11 +1,11 @@ -#![cfg(feature = "broken")] - -use futures::{ - self, - future::{ok, FutureResult}, - stream, Async, Poll, Stream, +use futures_core::Stream; +use futures_util::{ + future::{ready, Ready}, + pin_mut, }; +use std::task::{Context, Poll}; use std::{cell::Cell, rc::Rc}; +use tokio_test::{assert_pending, assert_ready, task}; use tower::ServiceExt; use tower_service::*; use tower_test::{assert_request_eq, mock}; @@ -20,46 +20,26 @@ struct Srv { impl Service<&'static str> for Srv { type Response = &'static str; type Error = Error; - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { if !self.admit.get() { - return Ok(Async::NotReady); + return Poll::Pending; } self.admit.set(false); - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } fn call(&mut self, req: &'static str) -> Self::Future { self.count.set(self.count.get() + 1); - ok(req) + ready(Ok(req)) } } -macro_rules! assert_ready { - ($e:expr) => {{ - match $e { - Ok(futures::Async::Ready(v)) => v, - Ok(_) => panic!("not ready"), - Err(e) => panic!("error = {:?}", e), - } - }}; -} - -macro_rules! assert_not_ready { - ($e:expr) => {{ - match $e { - Ok(futures::Async::NotReady) => {} - Ok(futures::Async::Ready(v)) => panic!("ready; value = {:?}", v), - Err(e) => panic!("error = {:?}", e), - } - }}; -} - #[test] fn ordered() { - let mut mock = tokio_mock_task::MockTask::new(); + let mut mock = task::MockTask::new(); let admit = Rc::new(Cell::new(false)); let count = Rc::new(Cell::new(0)); @@ -67,49 +47,60 @@ fn ordered() { count: count.clone(), admit: admit.clone(), }; - let (tx, rx) = futures::unsync::mpsc::unbounded(); - let mut ca = srv.call_all(rx.map_err(|_| "nope")); + let (mut tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let ca = srv.call_all(rx); + pin_mut!(ca); - assert_not_ready!(mock.enter(|| ca.poll())); - tx.unbounded_send("one").unwrap(); - mock.is_notified(); - assert_not_ready!(mock.enter(|| ca.poll())); + assert_pending!(mock.enter(|cx| ca.as_mut().poll_next(cx))); + tx.try_send("one").unwrap(); + mock.is_woken(); + assert_pending!(mock.enter(|cx| ca.as_mut().poll_next(cx))); admit.set(true); - let v = assert_ready!(mock.enter(|| ca.poll())); + let v = assert_ready!(mock.enter(|cx| ca.as_mut().poll_next(cx))) + .transpose() + .unwrap(); assert_eq!(v, Some("one")); - assert_not_ready!(mock.enter(|| ca.poll())); + assert_pending!(mock.enter(|cx| ca.as_mut().poll_next(cx))); admit.set(true); - tx.unbounded_send("two").unwrap(); - mock.is_notified(); - tx.unbounded_send("three").unwrap(); - let v = assert_ready!(mock.enter(|| ca.poll())); + tx.try_send("two").unwrap(); + mock.is_woken(); + tx.try_send("three").unwrap(); + let v = assert_ready!(mock.enter(|cx| ca.as_mut().poll_next(cx))) + .transpose() + .unwrap(); assert_eq!(v, Some("two")); - assert_not_ready!(mock.enter(|| ca.poll())); + assert_pending!(mock.enter(|cx| ca.as_mut().poll_next(cx))); admit.set(true); - let v = assert_ready!(mock.enter(|| ca.poll())); + let v = assert_ready!(mock.enter(|cx| ca.as_mut().poll_next(cx))) + .transpose() + .unwrap(); assert_eq!(v, Some("three")); admit.set(true); - assert_not_ready!(mock.enter(|| ca.poll())); + assert_pending!(mock.enter(|cx| ca.as_mut().poll_next(cx))); admit.set(true); - tx.unbounded_send("four").unwrap(); - mock.is_notified(); - let v = assert_ready!(mock.enter(|| ca.poll())); + tx.try_send("four").unwrap(); + mock.is_woken(); + let v = assert_ready!(mock.enter(|cx| ca.as_mut().poll_next(cx))) + .transpose() + .unwrap(); assert_eq!(v, Some("four")); - assert_not_ready!(mock.enter(|| ca.poll())); + assert_pending!(mock.enter(|cx| ca.as_mut().poll_next(cx))); // need to be ready since impl doesn't know it'll get EOF admit.set(true); // When we drop the request stream, CallAll should return None. drop(tx); - mock.is_notified(); - let v = assert_ready!(mock.enter(|| ca.poll())); + mock.is_woken(); + let v = assert_ready!(mock.enter(|cx| ca.as_mut().poll_next(cx))) + .transpose() + .unwrap(); assert!(v.is_none()); assert_eq!(count.get(), 4); // We should also be able to recover the wrapped Service. assert_eq!( - ca.into_inner(), + ca.take_service(), Srv { count: count.clone(), admit @@ -119,27 +110,37 @@ fn ordered() { #[test] fn unordered() { - let (mock, mut handle) = mock::pair::<_, &'static str>(); - let mut task = tokio_mock_task::MockTask::new(); - let requests = stream::iter_ok::<_, Error>(&["one", "two"]); + let (mock, handle) = mock::pair::<_, &'static str>(); + pin_mut!(handle); - let mut svc = mock.call_all(requests).unordered(); - assert_not_ready!(task.enter(|| svc.poll())); + let mut task = task::MockTask::new(); + let requests = futures_util::stream::iter(&["one", "two"]); + + let svc = mock.call_all(requests).unordered(); + pin_mut!(svc); + + assert_pending!(task.enter(|cx| svc.as_mut().poll_next(cx))); let resp1 = assert_request_eq!(handle, &"one"); let resp2 = assert_request_eq!(handle, &"two"); resp2.send_response("resp 1"); - let v = assert_ready!(task.enter(|| svc.poll())); + let v = assert_ready!(task.enter(|cx| svc.as_mut().poll_next(cx))) + .transpose() + .unwrap(); assert_eq!(v, Some("resp 1")); - assert_not_ready!(task.enter(|| svc.poll())); + assert_pending!(task.enter(|cx| svc.as_mut().poll_next(cx))); resp1.send_response("resp 2"); - let v = assert_ready!(task.enter(|| svc.poll())); + let v = assert_ready!(task.enter(|cx| svc.as_mut().poll_next(cx))) + .transpose() + .unwrap(); assert_eq!(v, Some("resp 2")); - let v = assert_ready!(task.enter(|| svc.poll())); + let v = assert_ready!(task.enter(|cx| svc.as_mut().poll_next(cx))) + .transpose() + .unwrap(); assert!(v.is_none()); } diff --git a/tower-util/tests/service_fn.rs b/tower-util/tests/service_fn.rs index b19f287..d1349c1 100644 --- a/tower-util/tests/service_fn.rs +++ b/tower-util/tests/service_fn.rs @@ -1,13 +1,11 @@ -use futures::executor::block_on; -use futures::future; +use futures_util::future::ready; +use tokio_test::block_on; use tower_service::Service; use tower_util::service_fn; #[test] fn simple() { - block_on(async { - let mut add_one = service_fn(|req| future::ok::<_, ()>(req + 1)); - let answer = add_one.call(1).await.unwrap(); - assert_eq!(answer, 2); - }); + let mut add_one = service_fn(|req| ready(Ok::<_, ()>(req + 1))); + let answer = block_on(add_one.call(1)).unwrap(); + assert_eq!(answer, 2); } diff --git a/tower/CHANGELOG.md b/tower/CHANGELOG.md index f26ad9f..8f2edbd 100644 --- a/tower/CHANGELOG.md +++ b/tower/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.3.0-alpha.1 + +- Move to `std::future` + # 0.1.1 (July 19, 2019) - Add `ServiceBuilder::into_inner` diff --git a/tower/Cargo.toml b/tower/Cargo.toml index f72a8c0..b4ac37b 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -8,13 +8,13 @@ name = "tower" # - README.md # - Update CHANGELOG.md. # - Create "v0.1.x" git tag. -version = "0.1.1" +version = "0.3.0-alpha.1" authors = ["Tower Maintainers "] license = "MIT" readme = "README.md" repository = "https://github.com/tower-rs/tower" homepage = "https://github.com/tower-rs/tower" -documentation = "https://docs.rs/tower/0.1.1" +documentation = "https://docs.rs/tower/0.3.0-alpha.1" description = """ Tower is a library of modular and reusable components for building robust clients and servers. @@ -28,20 +28,21 @@ default = ["full"] full = [] [dependencies] -futures = "0.1.26" -tower-buffer = "0.1.0" -tower-discover = "0.1.0" -tower-layer = "0.1.0" -tower-limit = "0.1.0" -tower-load-shed = "0.1.0" -tower-retry = "0.1.0" -tower-service = "0.2.0" -tower-timeout = "0.1.0" -tower-util = { version = "0.1.0", features = ["io"] } +tower-buffer = { version = "0.3.0-alpha.1", path = "../tower-buffer" } +tower-discover = { version = "0.3.0-alpha.1", path = "../tower-discover" } +tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" } +tower-limit = { version = "0.3.0-alpha.1", path = "../tower-limit" } +tower-load-shed = { version = "0.3.0-alpha.1", path = "../tower-load-shed" } +tower-retry = { version = "0.3.0-alpha.1", path = "../tower-retry" } +tower-service = "0.3.0-alpha.1" +tower-timeout = { version = "0.3.0-alpha.1", path = "../tower-timeout" } +#tower-util = { version = "0.3.0-alpha.1", path = "../tower-util", features = ["io"] } +tower-util = { version = "0.3.0-alpha.1", path = "../tower-util" } +futures-core-preview = "0.3.0-alpha.18" [dev-dependencies] -futures = "0.1.26" -log = "0.4.1" -tokio = "0.1" env_logger = { version = "0.5.3", default-features = false } -void = "1.0.2" +futures-util-preview = "0.3.0-alpha.18" +log = "0.4.1" +tokio = "0.2.0-alpha.1" +tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" } diff --git a/tower/src/builder/mod.rs b/tower/src/builder/mod.rs index b2718c6..f887a8c 100644 --- a/tower/src/builder/mod.rs +++ b/tower/src/builder/mod.rs @@ -40,15 +40,11 @@ use std::{fmt, time::Duration}; /// ``` /// # use tower::Service; /// # use tower::builder::ServiceBuilder; -/// # fn dox(my_service: T) -/// # where T: Service<()> + Send + 'static, -/// # T::Future: Send, -/// # T::Error: Into>, -/// # { +/// # async fn wrap(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send { /// ServiceBuilder::new() /// .buffer(100) /// .concurrency_limit(10) -/// .service(my_service) +/// .service(svc) /// # ; /// # } /// ``` @@ -62,15 +58,11 @@ use std::{fmt, time::Duration}; /// ``` /// # use tower::Service; /// # use tower::builder::ServiceBuilder; -/// # fn dox(my_service: T) -/// # where T: Service<()> + Send + 'static, -/// # T::Future: Send, -/// # T::Error: Into>, -/// # { +/// # async fn wrap(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send { /// ServiceBuilder::new() /// .concurrency_limit(10) /// .buffer(100) -/// .service(my_service) +/// .service(svc) /// # ; /// # } /// ``` @@ -84,63 +76,32 @@ use std::{fmt, time::Duration}; /// A `Service` stack with a single layer: /// /// ``` -/// # extern crate tower; -/// # extern crate tower_limit; -/// # extern crate futures; -/// # extern crate void; -/// # use void::Void; /// # use tower::Service; /// # use tower::builder::ServiceBuilder; /// # use tower_limit::concurrency::ConcurrencyLimitLayer; -/// # use futures::{Poll, future::{self, FutureResult}}; -/// # #[derive(Debug)] -/// # struct MyService; -/// # impl Service<()> for MyService { -/// # type Response = (); -/// # type Error = Void; -/// # type Future = FutureResult; -/// # fn poll_ready(&mut self) -> Poll<(), Self::Error> { -/// # Ok(().into()) -/// # } -/// # fn call(&mut self, _: ()) -> Self::Future { -/// # future::ok(()) -/// # } -/// # } +/// # async fn wrap(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send { /// ServiceBuilder::new() /// .concurrency_limit(5) -/// .service(MyService); +/// .service(svc); +/// # ; +/// # } /// ``` /// /// A `Service` stack with _multiple_ layers that contain rate limiting, /// in-flight request limits, and a channel-backed, clonable `Service`: /// /// ``` -/// # extern crate tower; -/// # extern crate futures; -/// # extern crate void; -/// # use void::Void; /// # use tower::Service; /// # use tower::builder::ServiceBuilder; /// # use std::time::Duration; -/// # use futures::{Poll, future::{self, FutureResult}}; -/// # #[derive(Debug)] -/// # struct MyService; -/// # impl Service<()> for MyService { -/// # type Response = (); -/// # type Error = Void; -/// # type Future = FutureResult; -/// # fn poll_ready(&mut self) -> Poll<(), Self::Error> { -/// # Ok(().into()) -/// # } -/// # fn call(&mut self, _: ()) -> Self::Future { -/// # future::ok(()) -/// # } -/// # } +/// # async fn wrap(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send { /// ServiceBuilder::new() /// .buffer(5) /// .concurrency_limit(5) /// .rate_limit(5, Duration::from_secs(1)) -/// .service(MyService); +/// .service(svc); +/// # ; +/// # } /// ``` #[derive(Clone)] pub struct ServiceBuilder { diff --git a/tower/src/lib.rs b/tower/src/lib.rs index 5b0648b..7fcc440 100644 --- a/tower/src/lib.rs +++ b/tower/src/lib.rs @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/tower/0.1.1")] +#![doc(html_root_url = "https://docs.rs/tower/0.3.0-alpha.1")] // Allows refining features in the future without breaking backwards // compatibility #![cfg(feature = "full")] @@ -30,4 +30,4 @@ pub mod util; pub use crate::{builder::ServiceBuilder, util::ServiceExt}; pub use tower_service::Service; -pub use tower_util::{service_fn, MakeConnection, MakeService}; +pub use tower_util::service_fn; diff --git a/tower/src/util.rs b/tower/src/util.rs index 1f5eeda..237188f 100644 --- a/tower/src/util.rs +++ b/tower/src/util.rs @@ -1,6 +1,6 @@ //! Combinators for working with `Service`s -use futures::Stream; +use futures_core::Stream; use tower_service::Service; pub use tower_util::{ BoxService, CallAll, CallAllUnordered, Either, Oneshot, Optional, Ready, UnsyncBoxService, @@ -38,7 +38,6 @@ pub trait ServiceExt: Service { Self: Sized, Self::Error: Into, S: Stream, - S::Error: Into, { CallAll::new(self, reqs) } diff --git a/tower/tests/builder.rs b/tower/tests/builder.rs index 3d60ceb..321477b 100644 --- a/tower/tests/builder.rs +++ b/tower/tests/builder.rs @@ -1,69 +1,52 @@ -use futures::{ - future::{self, FutureResult}, - prelude::*, -}; use std::time::Duration; +use futures_util::{pin_mut, future::{Ready, poll_fn}}; use tower::builder::ServiceBuilder; +use tower::util::ServiceExt; use tower_buffer::BufferLayer; use tower_limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer}; use tower_retry::{Policy, RetryLayer}; use tower_service::*; -use void::Void; +use tower_test::{mock}; + +#[tokio::test] +async fn builder_service() { + let (service, handle) = mock::pair(); + pin_mut!(handle); -#[test] -fn builder_service() { - tokio::run(future::lazy(|| { let policy = MockPolicy; - let mut client = ServiceBuilder::new() + let client = ServiceBuilder::new() .layer(BufferLayer::new(5)) .layer(ConcurrencyLimitLayer::new(5)) .layer(RateLimitLayer::new(5, Duration::from_secs(1))) .layer(RetryLayer::new(policy)) .layer(BufferLayer::new(5)) - .service(MockSvc); + .service(service); - client.poll_ready().unwrap(); - client - .call(Request) - .map(|_| ()) - .map_err(|_| panic!("this is bad")) - })); -} + // allow a request through + handle.allow(1); -#[derive(Debug, Clone)] -struct Request; -#[derive(Debug, Clone)] -struct Response; -#[derive(Debug)] -struct MockSvc; -impl Service for MockSvc { - type Response = Response; - type Error = Void; - type Future = FutureResult; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(().into()) - } - - fn call(&mut self, _: Request) -> Self::Future { - future::ok(Response) - } + let mut client = client.ready().await.unwrap(); + let fut = client.call("hello"); + let (request, rsp) = poll_fn(|cx| handle.as_mut().poll_request(cx)).await.unwrap(); + assert_eq!(request, "hello"); + rsp.send_response("world"); + assert_eq!(fut.await.unwrap(), "world"); } #[derive(Debug, Clone)] struct MockPolicy; -impl Policy for MockPolicy +impl Policy<&'static str, &'static str, E> for MockPolicy where E: Into>, { - type Future = FutureResult; + type Future = Ready; - fn retry(&self, _req: &Request, _result: Result<&Response, &E>) -> Option { + fn retry(&self, _req: &&'static str, _result: Result<&&'static str, &E>) -> Option { None } - fn clone_request(&self, req: &Request) -> Option { + fn clone_request(&self, req: &&'static str) -> Option<&'static str> { Some(req.clone()) } }