Update tower-util and tower to std::future (#330)
This bumps tower-util and tower to 0.3.0-alpha.1
This commit is contained in:
parent
9691d0d379
commit
0802ca2bce
|
@ -1,7 +1,7 @@
|
|||
[workspace]
|
||||
|
||||
members = [
|
||||
# "tower",
|
||||
"tower",
|
||||
# "tower-balance",
|
||||
"tower-buffer",
|
||||
"tower-discover",
|
||||
|
|
|
@ -27,4 +27,4 @@ futures-executor-preview = "0.3.0-alpha.18"
|
|||
tokio-test = "0.2.0-alpha.2"
|
||||
tokio-sync = "0.2.0-alpha.2"
|
||||
tower-service = "0.3.0-alpha.1"
|
||||
pin-utils = "0.1.0-alpha.4"
|
||||
pin-project = "0.4.0-alpha.10"
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
//! Future types
|
||||
|
||||
use crate::mock::error::{self, Error};
|
||||
use futures_util::ready;
|
||||
use pin_project::pin_project;
|
||||
use tokio_sync::oneshot;
|
||||
|
||||
use std::{
|
||||
|
@ -10,16 +12,16 @@ use std::{
|
|||
};
|
||||
|
||||
/// Future of the `Mock` response.
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct ResponseFuture<T> {
|
||||
#[pin]
|
||||
rx: Option<Rx<T>>,
|
||||
}
|
||||
|
||||
type Rx<T> = oneshot::Receiver<Result<T, Error>>;
|
||||
|
||||
impl<T> ResponseFuture<T> {
|
||||
pin_utils::unsafe_pinned!(rx: Option<Rx<T>>);
|
||||
|
||||
pub(crate) fn new(rx: Rx<T>) -> ResponseFuture<T> {
|
||||
ResponseFuture { rx: Some(rx) }
|
||||
}
|
||||
|
@ -32,13 +34,11 @@ impl<T> ResponseFuture<T> {
|
|||
impl<T> Future for ResponseFuture<T> {
|
||||
type Output = Result<T, Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
match self.rx().as_pin_mut() {
|
||||
Some(rx) => match rx.poll(cx) {
|
||||
Poll::Ready(Ok(Ok(v))) => Poll::Ready(Ok(v)),
|
||||
Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e)),
|
||||
Poll::Ready(Err(_)) => Poll::Ready(Err(error::Closed::new().into())),
|
||||
Poll::Pending => Poll::Pending,
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
match self.project().rx.as_pin_mut() {
|
||||
Some(rx) => match ready!(rx.poll(cx)) {
|
||||
Ok(r) => Poll::Ready(r),
|
||||
Err(_) => Poll::Ready(Err(error::Closed::new().into())),
|
||||
},
|
||||
None => Poll::Ready(Err(error::Closed::new().into())),
|
||||
}
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
# 0.3.0-alpha.1
|
||||
|
||||
- Move to `std::future`
|
||||
|
||||
# 0.1.0 (April 26, 2019)
|
||||
|
||||
- Initial release
|
||||
|
|
|
@ -15,7 +15,7 @@ license = "MIT"
|
|||
readme = "README.md"
|
||||
repository = "https://github.com/tower-rs/tower"
|
||||
homepage = "https://github.com/tower-rs/tower"
|
||||
documentation = "https://docs.rs/tower-util/0.1.0"
|
||||
documentation = "https://docs.rs/tower-util/0.3.0-alpha.1"
|
||||
description = """
|
||||
Utilities for working with `Service`.
|
||||
"""
|
||||
|
@ -24,10 +24,14 @@ edition = "2018"
|
|||
|
||||
[dependencies]
|
||||
tower-service = "=0.3.0-alpha.1"
|
||||
# tower-layer = "0.1.0"
|
||||
tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" }
|
||||
pin-project = { version = "0.4.0-alpha.10", features = ["project_attr"] }
|
||||
futures-util-preview = "0.3.0-alpha.18"
|
||||
futures-core-preview = "0.3.0-alpha.18"
|
||||
|
||||
[dev-dependencies]
|
||||
futures = { version = "=0.3.0-alpha.18", package = "futures-preview"}
|
||||
# tokio-mock-task = "0.1.1"
|
||||
# tower = { version = "0.1.0", path = "../tower" }
|
||||
# tower-test = { version = "0.1.0", path = "../tower-test" }
|
||||
futures-util-preview = "0.3.0-alpha.18"
|
||||
tokio-test = "0.2.0-alpha.1"
|
||||
tokio = "0.2.0-alpha.1"
|
||||
tower = { version = "0.3.0-alpha.1", path = "../tower" }
|
||||
tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" }
|
||||
|
|
|
@ -14,18 +14,14 @@
|
|||
//! # Examples
|
||||
//!
|
||||
//! ```
|
||||
//! # extern crate futures;
|
||||
//! # extern crate tower_service;
|
||||
//! # extern crate tower_util;
|
||||
//! # use futures::*;
|
||||
//! # use futures::future::FutureResult;
|
||||
//! use futures_util::future::ready;
|
||||
//! # use tower_service::Service;
|
||||
//! # use tower_util::{BoxService, service_fn};
|
||||
//! // Respond to requests using a closure, but closures cannot be named...
|
||||
//! # pub fn main() {
|
||||
//! let svc = service_fn(|mut request: String| {
|
||||
//! request.push_str(" response");
|
||||
//! Ok(request)
|
||||
//! ready(Ok(request))
|
||||
//! });
|
||||
//!
|
||||
//! let service: BoxService<String, String, ()> = BoxService::new(svc);
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
use futures::{Future, Poll};
|
||||
use tower_service::Service;
|
||||
|
||||
use std::fmt;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
/// A boxed `Service + Send` trait object.
|
||||
///
|
||||
|
@ -18,7 +22,7 @@ pub struct BoxService<T, U, E> {
|
|||
///
|
||||
/// This type alias represents a boxed future that is `Send` and can be moved
|
||||
/// across threads.
|
||||
type BoxFuture<T, E> = Box<dyn Future<Item = T, Error = E> + Send>;
|
||||
type BoxFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Boxed<S> {
|
||||
|
@ -41,8 +45,8 @@ impl<T, U, E> Service<T> for BoxService<T, U, E> {
|
|||
type Error = E;
|
||||
type Future = BoxFuture<U, E>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), E> {
|
||||
self.inner.poll_ready()
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, request: T) -> BoxFuture<U, E> {
|
||||
|
@ -68,13 +72,13 @@ where
|
|||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = Box<dyn Future<Item = S::Response, Error = S::Error> + Send>;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<S::Response, S::Error>> + Send>>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.inner.poll_ready()
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, request: Request) -> Self::Future {
|
||||
Box::new(self.inner.call(request))
|
||||
Box::pin(self.inner.call(request))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
use futures::{Future, Poll};
|
||||
use tower_service::Service;
|
||||
|
||||
use std::fmt;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
/// A boxed `Service` trait object.
|
||||
pub struct UnsyncBoxService<T, U, E> {
|
||||
|
@ -12,7 +16,7 @@ pub struct UnsyncBoxService<T, U, E> {
|
|||
///
|
||||
/// This type alias represents a boxed future that is *not* `Send` and must
|
||||
/// remain on the current thread.
|
||||
type UnsyncBoxFuture<T, E> = Box<dyn Future<Item = T, Error = E>>;
|
||||
type UnsyncBoxFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>>>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct UnsyncBoxed<S> {
|
||||
|
@ -35,8 +39,8 @@ impl<T, U, E> Service<T> for UnsyncBoxService<T, U, E> {
|
|||
type Error = E;
|
||||
type Future = UnsyncBoxFuture<U, E>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), E> {
|
||||
self.inner.poll_ready()
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, request: T) -> UnsyncBoxFuture<U, E> {
|
||||
|
@ -62,13 +66,13 @@ where
|
|||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = Box<dyn Future<Item = S::Response, Error = S::Error>>;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<S::Response, S::Error>>>>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.inner.poll_ready()
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, request: Request) -> Self::Future {
|
||||
Box::new(self.inner.call(request))
|
||||
Box::pin(self.inner.call(request))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,22 +1,31 @@
|
|||
use super::Error;
|
||||
use futures::{try_ready, Async, Future, Poll, Stream};
|
||||
use futures_core::{ready, Stream};
|
||||
use pin_project::pin_project;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
/// TODO: Dox
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CallAll<Svc, S, Q> {
|
||||
service: Svc,
|
||||
service: Option<Svc>,
|
||||
#[pin]
|
||||
stream: S,
|
||||
queue: Q,
|
||||
eof: bool,
|
||||
}
|
||||
|
||||
pub(crate) trait Drive<T: Future> {
|
||||
pub(crate) trait Drive<F: Future> {
|
||||
fn is_empty(&self) -> bool;
|
||||
|
||||
fn push(&mut self, future: T);
|
||||
fn push(&mut self, future: F);
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error>;
|
||||
// NOTE: this implicitly requires Self: Unpin just like Service does
|
||||
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>;
|
||||
}
|
||||
|
||||
impl<Svc, S, Q> CallAll<Svc, S, Q>
|
||||
|
@ -24,12 +33,11 @@ where
|
|||
Svc: Service<S::Item>,
|
||||
Svc::Error: Into<Error>,
|
||||
S: Stream,
|
||||
S::Error: Into<Error>,
|
||||
Q: Drive<Svc::Future>,
|
||||
{
|
||||
pub(crate) fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> {
|
||||
CallAll {
|
||||
service,
|
||||
service: Some(service),
|
||||
stream,
|
||||
queue,
|
||||
eof: false,
|
||||
|
@ -37,59 +45,73 @@ where
|
|||
}
|
||||
|
||||
/// Extract the wrapped `Service`.
|
||||
pub(crate) fn into_inner(self) -> Svc {
|
||||
self.service
|
||||
pub(crate) fn into_inner(mut self) -> Svc {
|
||||
self.service.take().expect("Service already taken")
|
||||
}
|
||||
|
||||
pub(crate) fn unordered(self) -> super::CallAllUnordered<Svc, S> {
|
||||
/// Extract the wrapped `Service`.
|
||||
pub(crate) fn take_service(mut self: Pin<&mut Self>) -> Svc {
|
||||
self.project()
|
||||
.service
|
||||
.take()
|
||||
.expect("Service already taken")
|
||||
}
|
||||
|
||||
pub(crate) fn unordered(mut self) -> super::CallAllUnordered<Svc, S> {
|
||||
assert!(self.queue.is_empty() && !self.eof);
|
||||
|
||||
super::CallAllUnordered::new(self.service, self.stream)
|
||||
super::CallAllUnordered::new(self.service.take().unwrap(), self.stream)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Svc, S, Q> Stream for CallAll<Svc, S, Q>
|
||||
where
|
||||
Svc: Service<S::Item>,
|
||||
Svc::Error: Into<Error>,
|
||||
Error: From<Svc::Error>,
|
||||
S: Stream,
|
||||
S::Error: Into<Error>,
|
||||
Q: Drive<Svc::Future>,
|
||||
{
|
||||
type Item = Svc::Response;
|
||||
type Error = Error;
|
||||
type Item = Result<Svc::Response, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.project();
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
loop {
|
||||
let res = self.queue.poll().map_err(Into::into);
|
||||
|
||||
// First, see if we have any responses to yield
|
||||
if let Async::Ready(Some(rsp)) = res? {
|
||||
return Ok(Async::Ready(Some(rsp)));
|
||||
if let Poll::Ready(r) = this.queue.poll(cx) {
|
||||
if let Some(rsp) = r.transpose()? {
|
||||
return Poll::Ready(Some(Ok(rsp)));
|
||||
}
|
||||
}
|
||||
|
||||
// If there are no more requests coming, check if we're done
|
||||
if self.eof {
|
||||
if self.queue.is_empty() {
|
||||
return Ok(Async::Ready(None));
|
||||
if *this.eof {
|
||||
if this.queue.is_empty() {
|
||||
return Poll::Ready(None);
|
||||
} else {
|
||||
return Ok(Async::NotReady);
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
|
||||
// Then, see that the service is ready for another request
|
||||
try_ready!(self.service.poll_ready().map_err(Into::into));
|
||||
let svc = this
|
||||
.service
|
||||
.as_mut()
|
||||
.expect("Using CallAll after extracing inner Service");
|
||||
ready!(svc.poll_ready(cx))?;
|
||||
|
||||
// If it is, gather the next request (if there is one)
|
||||
match self.stream.poll().map_err(Into::into)? {
|
||||
Async::Ready(Some(req)) => {
|
||||
self.queue.push(self.service.call(req));
|
||||
}
|
||||
Async::Ready(None) => {
|
||||
// We're all done once any outstanding requests have completed
|
||||
self.eof = true;
|
||||
}
|
||||
Async::NotReady => {
|
||||
match this.stream.as_mut().poll_next(cx) {
|
||||
Poll::Ready(r) => match r {
|
||||
Some(req) => {
|
||||
this.queue.push(svc.call(req));
|
||||
}
|
||||
None => {
|
||||
// We're all done once any outstanding requests have completed
|
||||
*this.eof = true;
|
||||
}
|
||||
},
|
||||
Poll::Pending => {
|
||||
// TODO: We probably want to "release" the slot we reserved in Svc here.
|
||||
// It may be a while until we get around to actually using it.
|
||||
}
|
||||
|
|
|
@ -1,87 +1,88 @@
|
|||
//! `Stream<Item = Request>` + `Service<Request>` => `Stream<Item = Response>`.
|
||||
|
||||
use super::{common, Error};
|
||||
use futures::{stream::FuturesOrdered, Future, Poll, Stream};
|
||||
use futures_core::Stream;
|
||||
use futures_util::stream::FuturesOrdered;
|
||||
use pin_project::pin_project;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
/// This is a `futures::Stream` of responses resulting from calling the wrapped `tower::Service`
|
||||
/// for each request received on the wrapped `Stream`.
|
||||
///
|
||||
/// ```rust
|
||||
/// # extern crate futures;
|
||||
/// # extern crate tower_service;
|
||||
/// # extern crate tokio_mock_task;
|
||||
/// # extern crate tower;
|
||||
/// # use futures::future::{ok, FutureResult};
|
||||
/// # use futures::{Async, Poll};
|
||||
/// # use std::task::{Poll, Context};
|
||||
/// # use std::cell::Cell;
|
||||
/// # use std::error::Error;
|
||||
/// # use std::rc::Rc;
|
||||
/// #
|
||||
/// use futures::Stream;
|
||||
/// use futures_util::future::{ready, Ready};
|
||||
/// use tower_service::Service;
|
||||
/// use tower::ServiceExt;
|
||||
/// use tokio::prelude::*;
|
||||
///
|
||||
/// // First, we need to have a Service to process our requests.
|
||||
/// #[derive(Debug, Eq, PartialEq)]
|
||||
/// struct FirstLetter;
|
||||
/// impl Service<&'static str> for FirstLetter {
|
||||
/// type Response = &'static str;
|
||||
/// type Error = Box<Error + Send + Sync>;
|
||||
/// type Future = FutureResult<Self::Response, Self::Error>;
|
||||
/// type Error = Box<dyn Error + Send + Sync>;
|
||||
/// type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||
///
|
||||
/// fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
/// Ok(Async::Ready(()))
|
||||
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
/// Poll::Ready(Ok(()))
|
||||
/// }
|
||||
///
|
||||
/// fn call(&mut self, req: &'static str) -> Self::Future {
|
||||
/// ok(&req[..1])
|
||||
/// ready(Ok(&req[..1]))
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// # fn main() {
|
||||
/// # let mut mock = tokio_mock_task::MockTask::new();
|
||||
/// // Next, we need a Stream of requests.
|
||||
/// let (reqs, rx) = futures::unsync::mpsc::unbounded();
|
||||
/// // Note that we have to help Rust out here by telling it what error type to use.
|
||||
/// // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
|
||||
/// let rsps = FirstLetter.call_all(rx.map_err(|_| "boom"));
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// // Next, we need a Stream of requests.
|
||||
/// let (mut reqs, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
/// // Note that we have to help Rust out here by telling it what error type to use.
|
||||
/// // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
|
||||
/// let mut rsps = FirstLetter.call_all(rx);
|
||||
///
|
||||
/// // Now, let's send a few requests and then check that we get the corresponding responses.
|
||||
/// reqs.unbounded_send("one");
|
||||
/// reqs.unbounded_send("two");
|
||||
/// reqs.unbounded_send("three");
|
||||
/// drop(reqs);
|
||||
/// // Now, let's send a few requests and then check that we get the corresponding responses.
|
||||
/// reqs.try_send("one");
|
||||
/// reqs.try_send("two");
|
||||
/// reqs.try_send("three");
|
||||
/// drop(reqs);
|
||||
///
|
||||
/// // We then loop over the response Strem that we get back from call_all.
|
||||
/// # // a little bit of trickery here since we don't have an executor
|
||||
/// # /*
|
||||
/// let mut iter = rsps.wait();
|
||||
/// # */
|
||||
/// # let mut iter = mock.enter(|| rsps.wait());
|
||||
/// # for (i, rsp) in (&mut iter).enumerate() {
|
||||
/// // Since we used .wait(), each response is a Result.
|
||||
/// match (i + 1, rsp.unwrap()) {
|
||||
/// (1, "o") |
|
||||
/// (2, "t") |
|
||||
/// (3, "t") => {}
|
||||
/// (n, i) => {
|
||||
/// unreachable!("{}. response was '{}'", n, i);
|
||||
/// // We then loop over the response Strem that we get back from call_all.
|
||||
/// let mut i = 0usize;
|
||||
/// while let Some(rsp) = rsps.next().await {
|
||||
/// // Each response is a Result (we could also have used TryStream::try_next)
|
||||
/// match (i + 1, rsp.unwrap()) {
|
||||
/// (1, "o") |
|
||||
/// (2, "t") |
|
||||
/// (3, "t") => {}
|
||||
/// (n, i) => {
|
||||
/// unreachable!("{}. response was '{}'", n, i);
|
||||
/// }
|
||||
/// }
|
||||
/// i += 1;
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// // And at the end, we can get the Service back when there are no more requests.
|
||||
/// let rsps = iter.into_inner();
|
||||
/// assert_eq!(rsps.into_inner(), FirstLetter);
|
||||
/// # }
|
||||
/// // And at the end, we can get the Service back when there are no more requests.
|
||||
/// assert_eq!(rsps.into_inner(), FirstLetter);
|
||||
/// }
|
||||
/// ```
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct CallAll<Svc, S>
|
||||
where
|
||||
Svc: Service<S::Item>,
|
||||
S: Stream,
|
||||
{
|
||||
#[pin]
|
||||
inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>,
|
||||
}
|
||||
|
||||
|
@ -90,7 +91,6 @@ where
|
|||
Svc: Service<S::Item>,
|
||||
Svc::Error: Into<Error>,
|
||||
S: Stream,
|
||||
S::Error: Into<Error>,
|
||||
{
|
||||
/// Create new `CallAll` combinator.
|
||||
///
|
||||
|
@ -103,10 +103,25 @@ where
|
|||
}
|
||||
|
||||
/// Extract the wrapped `Service`.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `take_service` was already called.
|
||||
pub fn into_inner(self) -> Svc {
|
||||
self.inner.into_inner()
|
||||
}
|
||||
|
||||
/// Extract the wrapped `Service`.
|
||||
///
|
||||
/// This `CallAll` can no longer be used after this function has been called.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `take_service` was already called.
|
||||
pub fn take_service(mut self: Pin<&mut Self>) -> Svc {
|
||||
self.project().inner.take_service()
|
||||
}
|
||||
|
||||
/// Return responses as they are ready, regardless of the initial order.
|
||||
///
|
||||
/// This function must be called before the stream is polled.
|
||||
|
@ -122,28 +137,26 @@ where
|
|||
impl<Svc, S> Stream for CallAll<Svc, S>
|
||||
where
|
||||
Svc: Service<S::Item>,
|
||||
Svc::Error: Into<Error>,
|
||||
Error: From<Svc::Error>,
|
||||
S: Stream,
|
||||
S::Error: Into<Error>,
|
||||
{
|
||||
type Item = Svc::Response;
|
||||
type Error = Error;
|
||||
type Item = Result<Svc::Response, Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
self.inner.poll()
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.project().inner.poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Future> common::Drive<T> for FuturesOrdered<T> {
|
||||
impl<F: Future> common::Drive<F> for FuturesOrdered<F> {
|
||||
fn is_empty(&self) -> bool {
|
||||
FuturesOrdered::is_empty(self)
|
||||
}
|
||||
|
||||
fn push(&mut self, future: T) {
|
||||
fn push(&mut self, future: F) {
|
||||
FuturesOrdered::push(self, future)
|
||||
}
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
|
||||
Stream::poll(self)
|
||||
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
|
||||
Stream::poll_next(Pin::new(self), cx)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,19 +1,28 @@
|
|||
//! `Stream<Item = Request>` + `Service<Request>` => `Stream<Item = Response>`.
|
||||
|
||||
use super::{common, Error};
|
||||
use futures::{stream::FuturesUnordered, Future, Poll, Stream};
|
||||
use futures_core::Stream;
|
||||
use futures_util::stream::FuturesUnordered;
|
||||
use pin_project::pin_project;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
/// A stream of responses received from the inner service in received order.
|
||||
///
|
||||
/// Similar to `CallAll` except, instead of yielding responses in request order,
|
||||
/// responses are returned as they are available.
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct CallAllUnordered<Svc, S>
|
||||
where
|
||||
Svc: Service<S::Item>,
|
||||
S: Stream,
|
||||
{
|
||||
#[pin]
|
||||
inner: common::CallAll<Svc, S, FuturesUnordered<Svc::Future>>,
|
||||
}
|
||||
|
||||
|
@ -22,7 +31,6 @@ where
|
|||
Svc: Service<S::Item>,
|
||||
Svc::Error: Into<Error>,
|
||||
S: Stream,
|
||||
S::Error: Into<Error>,
|
||||
{
|
||||
/// Create new `CallAllUnordered` combinator.
|
||||
///
|
||||
|
@ -36,36 +44,49 @@ where
|
|||
}
|
||||
|
||||
/// Extract the wrapped `Service`.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `take_service` was already called.
|
||||
pub fn into_inner(self) -> Svc {
|
||||
self.inner.into_inner()
|
||||
}
|
||||
|
||||
/// Extract the wrapped `Service`.
|
||||
///
|
||||
/// This `CallAll` can no longer be used after this function has been called.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `take_service` was already called.
|
||||
pub fn take_service(mut self: Pin<&mut Self>) -> Svc {
|
||||
self.project().inner.take_service()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Svc, S> Stream for CallAllUnordered<Svc, S>
|
||||
where
|
||||
Svc: Service<S::Item>,
|
||||
Svc::Error: Into<Error>,
|
||||
Error: From<Svc::Error>,
|
||||
S: Stream,
|
||||
S::Error: Into<Error>,
|
||||
{
|
||||
type Item = Svc::Response;
|
||||
type Error = Error;
|
||||
type Item = Result<Svc::Response, Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
self.inner.poll()
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.project().inner.poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Future> common::Drive<T> for FuturesUnordered<T> {
|
||||
impl<F: Future> common::Drive<F> for FuturesUnordered<F> {
|
||||
fn is_empty(&self) -> bool {
|
||||
FuturesUnordered::is_empty(self)
|
||||
}
|
||||
|
||||
fn push(&mut self, future: T) {
|
||||
fn push(&mut self, future: F) {
|
||||
FuturesUnordered::push(self, future)
|
||||
}
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
|
||||
Stream::poll(self)
|
||||
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
|
||||
Stream::poll_next(Pin::new(self), cx)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,13 @@
|
|||
//!
|
||||
//! See `Either` documentation for more details.
|
||||
|
||||
use futures::{Future, Poll};
|
||||
use futures_util::ready;
|
||||
use pin_project::{pin_project, project};
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
/// Combine two different service types into a single type.
|
||||
|
@ -10,10 +16,11 @@ use tower_service::Service;
|
|||
/// Both services must be of the same request, response, and error types.
|
||||
/// `Either` is useful for handling conditional branching in service middleware
|
||||
/// to different inner service types.
|
||||
#[pin_project]
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Either<A, B> {
|
||||
A(A),
|
||||
B(B),
|
||||
A(#[pin] A),
|
||||
B(#[pin] B),
|
||||
}
|
||||
|
||||
type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||
|
@ -21,20 +28,20 @@ type Error = Box<dyn std::error::Error + Send + Sync>;
|
|||
impl<A, B, Request> Service<Request> for Either<A, B>
|
||||
where
|
||||
A: Service<Request>,
|
||||
A::Error: Into<Error>,
|
||||
Error: From<A::Error>,
|
||||
B: Service<Request, Response = A::Response>,
|
||||
B::Error: Into<Error>,
|
||||
Error: From<B::Error>,
|
||||
{
|
||||
type Response = A::Response;
|
||||
type Error = Error;
|
||||
type Future = Either<A::Future, B::Future>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
use self::Either::*;
|
||||
|
||||
match self {
|
||||
A(service) => service.poll_ready().map_err(Into::into),
|
||||
B(service) => service.poll_ready().map_err(Into::into),
|
||||
A(service) => Poll::Ready(Ok(ready!(service.poll_ready(cx))?)),
|
||||
B(service) => Poll::Ready(Ok(ready!(service.poll_ready(cx))?)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -48,22 +55,21 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<A, B> Future for Either<A, B>
|
||||
impl<A, B, T, AE, BE> Future for Either<A, B>
|
||||
where
|
||||
A: Future,
|
||||
A::Error: Into<Error>,
|
||||
B: Future<Item = A::Item>,
|
||||
B::Error: Into<Error>,
|
||||
A: Future<Output = Result<T, AE>>,
|
||||
Error: From<AE>,
|
||||
B: Future<Output = Result<T, BE>>,
|
||||
Error: From<BE>,
|
||||
{
|
||||
type Item = A::Item;
|
||||
type Error = Error;
|
||||
type Output = Result<T, Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
use self::Either::*;
|
||||
|
||||
match self {
|
||||
A(fut) => fut.poll().map_err(Into::into),
|
||||
B(fut) => fut.poll().map_err(Into::into),
|
||||
#[project]
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
#[project]
|
||||
match self.project() {
|
||||
Either::A(fut) => Poll::Ready(Ok(ready!(fut.poll(cx))?)),
|
||||
Either::B(fut) => Poll::Ready(Ok(ready!(fut.poll(cx))?)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,31 +1,39 @@
|
|||
#![doc(html_root_url = "https://docs.rs/tower-util/0.1.0")]
|
||||
#![doc(html_root_url = "https://docs.rs/tower-util/0.3.0-alpha.1")]
|
||||
#![deny(rust_2018_idioms)]
|
||||
#![allow(elided_lifetimes_in_paths)]
|
||||
|
||||
//! Various utility types and functions that are generally with Tower.
|
||||
|
||||
//mod boxed;
|
||||
//mod call_all;
|
||||
//mod either;
|
||||
//pub mod layer;
|
||||
//mod oneshot;
|
||||
//mod optional;
|
||||
//mod ready;
|
||||
//mod sealed;
|
||||
mod boxed;
|
||||
mod call_all;
|
||||
mod either;
|
||||
pub mod layer;
|
||||
mod oneshot;
|
||||
mod optional;
|
||||
mod ready;
|
||||
mod sealed;
|
||||
mod service_fn;
|
||||
|
||||
pub use crate::service_fn::{service_fn, ServiceFn};
|
||||
pub use crate::{
|
||||
boxed::{BoxService, UnsyncBoxService},
|
||||
call_all::{CallAll, CallAllUnordered},
|
||||
either::Either,
|
||||
oneshot::Oneshot,
|
||||
optional::Optional,
|
||||
ready::Ready,
|
||||
service_fn::{service_fn, ServiceFn},
|
||||
};
|
||||
|
||||
//pub mod error {
|
||||
// //! Error types
|
||||
//
|
||||
// pub use crate::optional::error as optional;
|
||||
//}
|
||||
//
|
||||
//pub mod future {
|
||||
// //! Future types
|
||||
//
|
||||
// #[cfg(feature = "either")]
|
||||
// pub use crate::either::future as either;
|
||||
// pub use crate::optional::future as optional;
|
||||
//}
|
||||
pub mod error {
|
||||
//! Error types
|
||||
|
||||
pub use crate::optional::error as optional;
|
||||
}
|
||||
|
||||
pub mod future {
|
||||
//! Future types
|
||||
|
||||
#[cfg(feature = "either")]
|
||||
pub use crate::either::future as either;
|
||||
pub use crate::optional::future as optional;
|
||||
}
|
||||
|
|
|
@ -1,19 +1,26 @@
|
|||
use std::mem;
|
||||
|
||||
use futures::{Async, Future, Poll};
|
||||
use futures_util::ready;
|
||||
use pin_project::{pin_project, project};
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
/// A `Future` consuming a `Service` and request, waiting until the `Service`
|
||||
/// is ready, and then calling `Service::call` with the request, and
|
||||
/// waiting for that `Future`.
|
||||
#[pin_project]
|
||||
pub struct Oneshot<S: Service<Req>, Req> {
|
||||
#[pin]
|
||||
state: State<S, Req>,
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
enum State<S: Service<Req>, Req> {
|
||||
NotReady(S, Req),
|
||||
Called(S::Future),
|
||||
Tmp,
|
||||
NotReady(Option<(S, Req)>),
|
||||
Called(#[pin] S::Future),
|
||||
Done,
|
||||
}
|
||||
|
||||
impl<S, Req> Oneshot<S, Req>
|
||||
|
@ -22,7 +29,7 @@ where
|
|||
{
|
||||
pub fn new(svc: S, req: Req) -> Self {
|
||||
Oneshot {
|
||||
state: State::NotReady(svc, req),
|
||||
state: State::NotReady(Some((svc, req))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,31 +38,25 @@ impl<S, Req> Future for Oneshot<S, Req>
|
|||
where
|
||||
S: Service<Req>,
|
||||
{
|
||||
type Item = S::Response;
|
||||
type Error = S::Error;
|
||||
type Output = Result<S::Response, S::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
#[project]
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.project();
|
||||
loop {
|
||||
match mem::replace(&mut self.state, State::Tmp) {
|
||||
State::NotReady(mut svc, req) => match svc.poll_ready()? {
|
||||
Async::Ready(()) => {
|
||||
self.state = State::Called(svc.call(req));
|
||||
}
|
||||
Async::NotReady => {
|
||||
self.state = State::NotReady(svc, req);
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
},
|
||||
State::Called(mut fut) => match fut.poll()? {
|
||||
Async::Ready(res) => {
|
||||
return Ok(Async::Ready(res));
|
||||
}
|
||||
Async::NotReady => {
|
||||
self.state = State::Called(fut);
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
},
|
||||
State::Tmp => panic!("polled after complete"),
|
||||
#[project]
|
||||
match this.state.project() {
|
||||
State::NotReady(nr) => {
|
||||
let (mut svc, req) = nr.take().expect("We immediately transition to ::Called");
|
||||
let _ = ready!(svc.poll_ready(cx))?;
|
||||
this.state.set(State::Called(svc.call(req)));
|
||||
}
|
||||
State::Called(fut) => {
|
||||
let res = ready!(fut.poll(cx))?;
|
||||
this.state.set(State::Done);
|
||||
return Poll::Ready(Ok(res));
|
||||
}
|
||||
State::Done => panic!("polled after complete"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,16 @@
|
|||
use super::{error, Error};
|
||||
use futures::{Future, Poll};
|
||||
use futures_util::ready;
|
||||
use pin_project::pin_project;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
/// Response future returned by `Optional`.
|
||||
#[pin_project]
|
||||
pub struct ResponseFuture<T> {
|
||||
#[pin]
|
||||
inner: Option<T>,
|
||||
}
|
||||
|
||||
|
@ -12,18 +20,17 @@ impl<T> ResponseFuture<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Future for ResponseFuture<T>
|
||||
impl<F, T, E> Future for ResponseFuture<F>
|
||||
where
|
||||
T: Future,
|
||||
T::Error: Into<Error>,
|
||||
F: Future<Output = Result<T, E>>,
|
||||
Error: From<E>,
|
||||
{
|
||||
type Item = T::Item;
|
||||
type Error = Error;
|
||||
type Output = Result<T, Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self.inner {
|
||||
Some(ref mut inner) => inner.poll().map_err(Into::into),
|
||||
None => Err(error::None::new().into()),
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match self.project().inner.as_pin_mut() {
|
||||
Some(inner) => Poll::Ready(Ok(ready!(inner.poll(cx))?)),
|
||||
None => Poll::Ready(Err(error::None::new().into())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ pub mod error;
|
|||
pub mod future;
|
||||
|
||||
use self::{error::Error, future::ResponseFuture};
|
||||
use futures::Poll;
|
||||
use std::task::{Context, Poll};
|
||||
use tower_service::Service;
|
||||
|
||||
/// Optionally forwards requests to an inner service.
|
||||
|
@ -31,17 +31,20 @@ impl<T> Optional<T> {
|
|||
impl<T, Request> Service<Request> for Optional<T>
|
||||
where
|
||||
T: Service<Request>,
|
||||
T::Error: Into<Error>,
|
||||
Error: From<T::Error>,
|
||||
{
|
||||
type Response = T::Response;
|
||||
type Error = Error;
|
||||
type Future = ResponseFuture<T::Future>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
match self.inner {
|
||||
Some(ref mut inner) => inner.poll_ready().map_err(Into::into),
|
||||
Some(ref mut inner) => match inner.poll_ready(cx) {
|
||||
Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)),
|
||||
Poll::Pending => Poll::Pending,
|
||||
},
|
||||
// None services are always ready
|
||||
None => Ok(().into()),
|
||||
None => Poll::Ready(Ok(())),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,11 +1,18 @@
|
|||
use std::{fmt, marker::PhantomData};
|
||||
|
||||
use futures::{try_ready, Future, Poll};
|
||||
use futures_util::ready;
|
||||
use pin_project::pin_project;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
/// Future yielding a `Service` once the service is ready to process a request
|
||||
///
|
||||
/// `Ready` values are produced by `ServiceExt::ready`.
|
||||
#[pin_project]
|
||||
pub struct Ready<T, Request> {
|
||||
inner: Option<T>,
|
||||
_p: PhantomData<fn() -> Request>,
|
||||
|
@ -27,18 +34,17 @@ impl<T, Request> Future for Ready<T, Request>
|
|||
where
|
||||
T: Service<Request>,
|
||||
{
|
||||
type Item = T;
|
||||
type Error = T::Error;
|
||||
type Output = Result<T, T::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<T, T::Error> {
|
||||
match self.inner {
|
||||
Some(ref mut service) => {
|
||||
let _ = try_ready!(service.poll_ready());
|
||||
}
|
||||
None => panic!("called `poll` after future completed"),
|
||||
}
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
ready!(this
|
||||
.inner
|
||||
.as_mut()
|
||||
.expect("called `poll` after future completed")
|
||||
.poll_ready(cx))?;
|
||||
|
||||
Ok(self.inner.take().unwrap().into())
|
||||
Poll::Ready(Ok(this.inner.take().unwrap()))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
#![cfg(feature = "broken")]
|
||||
|
||||
use futures::{
|
||||
self,
|
||||
future::{ok, FutureResult},
|
||||
stream, Async, Poll, Stream,
|
||||
use futures_core::Stream;
|
||||
use futures_util::{
|
||||
future::{ready, Ready},
|
||||
pin_mut,
|
||||
};
|
||||
use std::task::{Context, Poll};
|
||||
use std::{cell::Cell, rc::Rc};
|
||||
use tokio_test::{assert_pending, assert_ready, task};
|
||||
use tower::ServiceExt;
|
||||
use tower_service::*;
|
||||
use tower_test::{assert_request_eq, mock};
|
||||
|
@ -20,46 +20,26 @@ struct Srv {
|
|||
impl Service<&'static str> for Srv {
|
||||
type Response = &'static str;
|
||||
type Error = Error;
|
||||
type Future = FutureResult<Self::Response, Error>;
|
||||
type Future = Ready<Result<Self::Response, Error>>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
if !self.admit.get() {
|
||||
return Ok(Async::NotReady);
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
self.admit.set(false);
|
||||
Ok(Async::Ready(()))
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: &'static str) -> Self::Future {
|
||||
self.count.set(self.count.get() + 1);
|
||||
ok(req)
|
||||
ready(Ok(req))
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! assert_ready {
|
||||
($e:expr) => {{
|
||||
match $e {
|
||||
Ok(futures::Async::Ready(v)) => v,
|
||||
Ok(_) => panic!("not ready"),
|
||||
Err(e) => panic!("error = {:?}", e),
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
macro_rules! assert_not_ready {
|
||||
($e:expr) => {{
|
||||
match $e {
|
||||
Ok(futures::Async::NotReady) => {}
|
||||
Ok(futures::Async::Ready(v)) => panic!("ready; value = {:?}", v),
|
||||
Err(e) => panic!("error = {:?}", e),
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ordered() {
|
||||
let mut mock = tokio_mock_task::MockTask::new();
|
||||
let mut mock = task::MockTask::new();
|
||||
|
||||
let admit = Rc::new(Cell::new(false));
|
||||
let count = Rc::new(Cell::new(0));
|
||||
|
@ -67,49 +47,60 @@ fn ordered() {
|
|||
count: count.clone(),
|
||||
admit: admit.clone(),
|
||||
};
|
||||
let (tx, rx) = futures::unsync::mpsc::unbounded();
|
||||
let mut ca = srv.call_all(rx.map_err(|_| "nope"));
|
||||
let (mut tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let ca = srv.call_all(rx);
|
||||
pin_mut!(ca);
|
||||
|
||||
assert_not_ready!(mock.enter(|| ca.poll()));
|
||||
tx.unbounded_send("one").unwrap();
|
||||
mock.is_notified();
|
||||
assert_not_ready!(mock.enter(|| ca.poll()));
|
||||
assert_pending!(mock.enter(|cx| ca.as_mut().poll_next(cx)));
|
||||
tx.try_send("one").unwrap();
|
||||
mock.is_woken();
|
||||
assert_pending!(mock.enter(|cx| ca.as_mut().poll_next(cx)));
|
||||
admit.set(true);
|
||||
let v = assert_ready!(mock.enter(|| ca.poll()));
|
||||
let v = assert_ready!(mock.enter(|cx| ca.as_mut().poll_next(cx)))
|
||||
.transpose()
|
||||
.unwrap();
|
||||
assert_eq!(v, Some("one"));
|
||||
assert_not_ready!(mock.enter(|| ca.poll()));
|
||||
assert_pending!(mock.enter(|cx| ca.as_mut().poll_next(cx)));
|
||||
admit.set(true);
|
||||
tx.unbounded_send("two").unwrap();
|
||||
mock.is_notified();
|
||||
tx.unbounded_send("three").unwrap();
|
||||
let v = assert_ready!(mock.enter(|| ca.poll()));
|
||||
tx.try_send("two").unwrap();
|
||||
mock.is_woken();
|
||||
tx.try_send("three").unwrap();
|
||||
let v = assert_ready!(mock.enter(|cx| ca.as_mut().poll_next(cx)))
|
||||
.transpose()
|
||||
.unwrap();
|
||||
assert_eq!(v, Some("two"));
|
||||
assert_not_ready!(mock.enter(|| ca.poll()));
|
||||
assert_pending!(mock.enter(|cx| ca.as_mut().poll_next(cx)));
|
||||
admit.set(true);
|
||||
let v = assert_ready!(mock.enter(|| ca.poll()));
|
||||
let v = assert_ready!(mock.enter(|cx| ca.as_mut().poll_next(cx)))
|
||||
.transpose()
|
||||
.unwrap();
|
||||
assert_eq!(v, Some("three"));
|
||||
admit.set(true);
|
||||
assert_not_ready!(mock.enter(|| ca.poll()));
|
||||
assert_pending!(mock.enter(|cx| ca.as_mut().poll_next(cx)));
|
||||
admit.set(true);
|
||||
tx.unbounded_send("four").unwrap();
|
||||
mock.is_notified();
|
||||
let v = assert_ready!(mock.enter(|| ca.poll()));
|
||||
tx.try_send("four").unwrap();
|
||||
mock.is_woken();
|
||||
let v = assert_ready!(mock.enter(|cx| ca.as_mut().poll_next(cx)))
|
||||
.transpose()
|
||||
.unwrap();
|
||||
assert_eq!(v, Some("four"));
|
||||
assert_not_ready!(mock.enter(|| ca.poll()));
|
||||
assert_pending!(mock.enter(|cx| ca.as_mut().poll_next(cx)));
|
||||
|
||||
// need to be ready since impl doesn't know it'll get EOF
|
||||
admit.set(true);
|
||||
|
||||
// When we drop the request stream, CallAll should return None.
|
||||
drop(tx);
|
||||
mock.is_notified();
|
||||
let v = assert_ready!(mock.enter(|| ca.poll()));
|
||||
mock.is_woken();
|
||||
let v = assert_ready!(mock.enter(|cx| ca.as_mut().poll_next(cx)))
|
||||
.transpose()
|
||||
.unwrap();
|
||||
assert!(v.is_none());
|
||||
assert_eq!(count.get(), 4);
|
||||
|
||||
// We should also be able to recover the wrapped Service.
|
||||
assert_eq!(
|
||||
ca.into_inner(),
|
||||
ca.take_service(),
|
||||
Srv {
|
||||
count: count.clone(),
|
||||
admit
|
||||
|
@ -119,27 +110,37 @@ fn ordered() {
|
|||
|
||||
#[test]
|
||||
fn unordered() {
|
||||
let (mock, mut handle) = mock::pair::<_, &'static str>();
|
||||
let mut task = tokio_mock_task::MockTask::new();
|
||||
let requests = stream::iter_ok::<_, Error>(&["one", "two"]);
|
||||
let (mock, handle) = mock::pair::<_, &'static str>();
|
||||
pin_mut!(handle);
|
||||
|
||||
let mut svc = mock.call_all(requests).unordered();
|
||||
assert_not_ready!(task.enter(|| svc.poll()));
|
||||
let mut task = task::MockTask::new();
|
||||
let requests = futures_util::stream::iter(&["one", "two"]);
|
||||
|
||||
let svc = mock.call_all(requests).unordered();
|
||||
pin_mut!(svc);
|
||||
|
||||
assert_pending!(task.enter(|cx| svc.as_mut().poll_next(cx)));
|
||||
|
||||
let resp1 = assert_request_eq!(handle, &"one");
|
||||
let resp2 = assert_request_eq!(handle, &"two");
|
||||
|
||||
resp2.send_response("resp 1");
|
||||
|
||||
let v = assert_ready!(task.enter(|| svc.poll()));
|
||||
let v = assert_ready!(task.enter(|cx| svc.as_mut().poll_next(cx)))
|
||||
.transpose()
|
||||
.unwrap();
|
||||
assert_eq!(v, Some("resp 1"));
|
||||
assert_not_ready!(task.enter(|| svc.poll()));
|
||||
assert_pending!(task.enter(|cx| svc.as_mut().poll_next(cx)));
|
||||
|
||||
resp1.send_response("resp 2");
|
||||
|
||||
let v = assert_ready!(task.enter(|| svc.poll()));
|
||||
let v = assert_ready!(task.enter(|cx| svc.as_mut().poll_next(cx)))
|
||||
.transpose()
|
||||
.unwrap();
|
||||
assert_eq!(v, Some("resp 2"));
|
||||
|
||||
let v = assert_ready!(task.enter(|| svc.poll()));
|
||||
let v = assert_ready!(task.enter(|cx| svc.as_mut().poll_next(cx)))
|
||||
.transpose()
|
||||
.unwrap();
|
||||
assert!(v.is_none());
|
||||
}
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
use futures::executor::block_on;
|
||||
use futures::future;
|
||||
use futures_util::future::ready;
|
||||
use tokio_test::block_on;
|
||||
use tower_service::Service;
|
||||
use tower_util::service_fn;
|
||||
|
||||
#[test]
|
||||
fn simple() {
|
||||
block_on(async {
|
||||
let mut add_one = service_fn(|req| future::ok::<_, ()>(req + 1));
|
||||
let answer = add_one.call(1).await.unwrap();
|
||||
assert_eq!(answer, 2);
|
||||
});
|
||||
let mut add_one = service_fn(|req| ready(Ok::<_, ()>(req + 1)));
|
||||
let answer = block_on(add_one.call(1)).unwrap();
|
||||
assert_eq!(answer, 2);
|
||||
}
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
# 0.3.0-alpha.1
|
||||
|
||||
- Move to `std::future`
|
||||
|
||||
# 0.1.1 (July 19, 2019)
|
||||
|
||||
- Add `ServiceBuilder::into_inner`
|
||||
|
|
|
@ -8,13 +8,13 @@ name = "tower"
|
|||
# - README.md
|
||||
# - Update CHANGELOG.md.
|
||||
# - Create "v0.1.x" git tag.
|
||||
version = "0.1.1"
|
||||
version = "0.3.0-alpha.1"
|
||||
authors = ["Tower Maintainers <team@tower-rs.com>"]
|
||||
license = "MIT"
|
||||
readme = "README.md"
|
||||
repository = "https://github.com/tower-rs/tower"
|
||||
homepage = "https://github.com/tower-rs/tower"
|
||||
documentation = "https://docs.rs/tower/0.1.1"
|
||||
documentation = "https://docs.rs/tower/0.3.0-alpha.1"
|
||||
description = """
|
||||
Tower is a library of modular and reusable components for building robust
|
||||
clients and servers.
|
||||
|
@ -28,20 +28,21 @@ default = ["full"]
|
|||
full = []
|
||||
|
||||
[dependencies]
|
||||
futures = "0.1.26"
|
||||
tower-buffer = "0.1.0"
|
||||
tower-discover = "0.1.0"
|
||||
tower-layer = "0.1.0"
|
||||
tower-limit = "0.1.0"
|
||||
tower-load-shed = "0.1.0"
|
||||
tower-retry = "0.1.0"
|
||||
tower-service = "0.2.0"
|
||||
tower-timeout = "0.1.0"
|
||||
tower-util = { version = "0.1.0", features = ["io"] }
|
||||
tower-buffer = { version = "0.3.0-alpha.1", path = "../tower-buffer" }
|
||||
tower-discover = { version = "0.3.0-alpha.1", path = "../tower-discover" }
|
||||
tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" }
|
||||
tower-limit = { version = "0.3.0-alpha.1", path = "../tower-limit" }
|
||||
tower-load-shed = { version = "0.3.0-alpha.1", path = "../tower-load-shed" }
|
||||
tower-retry = { version = "0.3.0-alpha.1", path = "../tower-retry" }
|
||||
tower-service = "0.3.0-alpha.1"
|
||||
tower-timeout = { version = "0.3.0-alpha.1", path = "../tower-timeout" }
|
||||
#tower-util = { version = "0.3.0-alpha.1", path = "../tower-util", features = ["io"] }
|
||||
tower-util = { version = "0.3.0-alpha.1", path = "../tower-util" }
|
||||
futures-core-preview = "0.3.0-alpha.18"
|
||||
|
||||
[dev-dependencies]
|
||||
futures = "0.1.26"
|
||||
log = "0.4.1"
|
||||
tokio = "0.1"
|
||||
env_logger = { version = "0.5.3", default-features = false }
|
||||
void = "1.0.2"
|
||||
futures-util-preview = "0.3.0-alpha.18"
|
||||
log = "0.4.1"
|
||||
tokio = "0.2.0-alpha.1"
|
||||
tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" }
|
||||
|
|
|
@ -40,15 +40,11 @@ use std::{fmt, time::Duration};
|
|||
/// ```
|
||||
/// # use tower::Service;
|
||||
/// # use tower::builder::ServiceBuilder;
|
||||
/// # fn dox<T>(my_service: T)
|
||||
/// # where T: Service<()> + Send + 'static,
|
||||
/// # T::Future: Send,
|
||||
/// # T::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
/// # {
|
||||
/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
|
||||
/// ServiceBuilder::new()
|
||||
/// .buffer(100)
|
||||
/// .concurrency_limit(10)
|
||||
/// .service(my_service)
|
||||
/// .service(svc)
|
||||
/// # ;
|
||||
/// # }
|
||||
/// ```
|
||||
|
@ -62,15 +58,11 @@ use std::{fmt, time::Duration};
|
|||
/// ```
|
||||
/// # use tower::Service;
|
||||
/// # use tower::builder::ServiceBuilder;
|
||||
/// # fn dox<T>(my_service: T)
|
||||
/// # where T: Service<()> + Send + 'static,
|
||||
/// # T::Future: Send,
|
||||
/// # T::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
/// # {
|
||||
/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
|
||||
/// ServiceBuilder::new()
|
||||
/// .concurrency_limit(10)
|
||||
/// .buffer(100)
|
||||
/// .service(my_service)
|
||||
/// .service(svc)
|
||||
/// # ;
|
||||
/// # }
|
||||
/// ```
|
||||
|
@ -84,63 +76,32 @@ use std::{fmt, time::Duration};
|
|||
/// A `Service` stack with a single layer:
|
||||
///
|
||||
/// ```
|
||||
/// # extern crate tower;
|
||||
/// # extern crate tower_limit;
|
||||
/// # extern crate futures;
|
||||
/// # extern crate void;
|
||||
/// # use void::Void;
|
||||
/// # use tower::Service;
|
||||
/// # use tower::builder::ServiceBuilder;
|
||||
/// # use tower_limit::concurrency::ConcurrencyLimitLayer;
|
||||
/// # use futures::{Poll, future::{self, FutureResult}};
|
||||
/// # #[derive(Debug)]
|
||||
/// # struct MyService;
|
||||
/// # impl Service<()> for MyService {
|
||||
/// # type Response = ();
|
||||
/// # type Error = Void;
|
||||
/// # type Future = FutureResult<Self::Response, Self::Error>;
|
||||
/// # fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
/// # Ok(().into())
|
||||
/// # }
|
||||
/// # fn call(&mut self, _: ()) -> Self::Future {
|
||||
/// # future::ok(())
|
||||
/// # }
|
||||
/// # }
|
||||
/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
|
||||
/// ServiceBuilder::new()
|
||||
/// .concurrency_limit(5)
|
||||
/// .service(MyService);
|
||||
/// .service(svc);
|
||||
/// # ;
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// A `Service` stack with _multiple_ layers that contain rate limiting,
|
||||
/// in-flight request limits, and a channel-backed, clonable `Service`:
|
||||
///
|
||||
/// ```
|
||||
/// # extern crate tower;
|
||||
/// # extern crate futures;
|
||||
/// # extern crate void;
|
||||
/// # use void::Void;
|
||||
/// # use tower::Service;
|
||||
/// # use tower::builder::ServiceBuilder;
|
||||
/// # use std::time::Duration;
|
||||
/// # use futures::{Poll, future::{self, FutureResult}};
|
||||
/// # #[derive(Debug)]
|
||||
/// # struct MyService;
|
||||
/// # impl Service<()> for MyService {
|
||||
/// # type Response = ();
|
||||
/// # type Error = Void;
|
||||
/// # type Future = FutureResult<Self::Response, Self::Error>;
|
||||
/// # fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
/// # Ok(().into())
|
||||
/// # }
|
||||
/// # fn call(&mut self, _: ()) -> Self::Future {
|
||||
/// # future::ok(())
|
||||
/// # }
|
||||
/// # }
|
||||
/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
|
||||
/// ServiceBuilder::new()
|
||||
/// .buffer(5)
|
||||
/// .concurrency_limit(5)
|
||||
/// .rate_limit(5, Duration::from_secs(1))
|
||||
/// .service(MyService);
|
||||
/// .service(svc);
|
||||
/// # ;
|
||||
/// # }
|
||||
/// ```
|
||||
#[derive(Clone)]
|
||||
pub struct ServiceBuilder<L> {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
#![doc(html_root_url = "https://docs.rs/tower/0.1.1")]
|
||||
#![doc(html_root_url = "https://docs.rs/tower/0.3.0-alpha.1")]
|
||||
// Allows refining features in the future without breaking backwards
|
||||
// compatibility
|
||||
#![cfg(feature = "full")]
|
||||
|
@ -30,4 +30,4 @@ pub mod util;
|
|||
|
||||
pub use crate::{builder::ServiceBuilder, util::ServiceExt};
|
||||
pub use tower_service::Service;
|
||||
pub use tower_util::{service_fn, MakeConnection, MakeService};
|
||||
pub use tower_util::service_fn;
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Combinators for working with `Service`s
|
||||
|
||||
use futures::Stream;
|
||||
use futures_core::Stream;
|
||||
use tower_service::Service;
|
||||
pub use tower_util::{
|
||||
BoxService, CallAll, CallAllUnordered, Either, Oneshot, Optional, Ready, UnsyncBoxService,
|
||||
|
@ -38,7 +38,6 @@ pub trait ServiceExt<Request>: Service<Request> {
|
|||
Self: Sized,
|
||||
Self::Error: Into<Error>,
|
||||
S: Stream<Item = Request>,
|
||||
S::Error: Into<Error>,
|
||||
{
|
||||
CallAll::new(self, reqs)
|
||||
}
|
||||
|
|
|
@ -1,69 +1,52 @@
|
|||
use futures::{
|
||||
future::{self, FutureResult},
|
||||
prelude::*,
|
||||
};
|
||||
use std::time::Duration;
|
||||
use futures_util::{pin_mut, future::{Ready, poll_fn}};
|
||||
use tower::builder::ServiceBuilder;
|
||||
use tower::util::ServiceExt;
|
||||
use tower_buffer::BufferLayer;
|
||||
use tower_limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer};
|
||||
use tower_retry::{Policy, RetryLayer};
|
||||
use tower_service::*;
|
||||
use void::Void;
|
||||
use tower_test::{mock};
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_service() {
|
||||
let (service, handle) = mock::pair();
|
||||
pin_mut!(handle);
|
||||
|
||||
#[test]
|
||||
fn builder_service() {
|
||||
tokio::run(future::lazy(|| {
|
||||
let policy = MockPolicy;
|
||||
let mut client = ServiceBuilder::new()
|
||||
let client = ServiceBuilder::new()
|
||||
.layer(BufferLayer::new(5))
|
||||
.layer(ConcurrencyLimitLayer::new(5))
|
||||
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
|
||||
.layer(RetryLayer::new(policy))
|
||||
.layer(BufferLayer::new(5))
|
||||
.service(MockSvc);
|
||||
.service(service);
|
||||
|
||||
client.poll_ready().unwrap();
|
||||
client
|
||||
.call(Request)
|
||||
.map(|_| ())
|
||||
.map_err(|_| panic!("this is bad"))
|
||||
}));
|
||||
}
|
||||
// allow a request through
|
||||
handle.allow(1);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Request;
|
||||
#[derive(Debug, Clone)]
|
||||
struct Response;
|
||||
#[derive(Debug)]
|
||||
struct MockSvc;
|
||||
impl Service<Request> for MockSvc {
|
||||
type Response = Response;
|
||||
type Error = Void;
|
||||
type Future = FutureResult<Self::Response, Self::Error>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(().into())
|
||||
}
|
||||
|
||||
fn call(&mut self, _: Request) -> Self::Future {
|
||||
future::ok(Response)
|
||||
}
|
||||
let mut client = client.ready().await.unwrap();
|
||||
let fut = client.call("hello");
|
||||
let (request, rsp) = poll_fn(|cx| handle.as_mut().poll_request(cx)).await.unwrap();
|
||||
assert_eq!(request, "hello");
|
||||
rsp.send_response("world");
|
||||
assert_eq!(fut.await.unwrap(), "world");
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct MockPolicy;
|
||||
|
||||
impl<E> Policy<Request, Response, E> for MockPolicy
|
||||
impl<E> Policy<&'static str, &'static str, E> for MockPolicy
|
||||
where
|
||||
E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
|
||||
{
|
||||
type Future = FutureResult<Self, ()>;
|
||||
type Future = Ready<Self>;
|
||||
|
||||
fn retry(&self, _req: &Request, _result: Result<&Response, &E>) -> Option<Self::Future> {
|
||||
fn retry(&self, _req: &&'static str, _result: Result<&&'static str, &E>) -> Option<Self::Future> {
|
||||
None
|
||||
}
|
||||
|
||||
fn clone_request(&self, req: &Request) -> Option<Request> {
|
||||
fn clone_request(&self, req: &&'static str) -> Option<&'static str> {
|
||||
Some(req.clone())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue