Add Future-like combinators (#396)

This commit is contained in:
Harry Barber 2020-07-17 21:33:00 +01:00 committed by GitHub
parent 4316894422
commit ab7518ef13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 516 additions and 0 deletions

67
tower/src/util/map_err.rs Normal file
View File

@ -0,0 +1,67 @@
use futures_util::{future::MapErr as MapErrFut, TryFutureExt};
use std::task::{Context, Poll};
use tower_layer::Layer;
use tower_service::Service;
/// Service returned by the [`map_err`] combinator.
///
/// [`map_err`]: crate::util::ServiceExt::map_err
#[derive(Debug)]
pub struct MapErr<S, F> {
inner: S,
f: F,
}
impl<S, F> MapErr<S, F> {
/// Creates a new [`MapErr`] service.
pub fn new(inner: S, f: F) -> Self {
MapErr { f, inner }
}
}
impl<S, F, Request, Error> Service<Request> for MapErr<S, F>
where
S: Service<Request>,
F: FnOnce(S::Error) -> Error + Clone,
{
type Response = S::Response;
type Error = Error;
type Future = MapErrFut<S::Future, F>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(self.f.clone())
}
fn call(&mut self, request: Request) -> Self::Future {
self.inner.call(request).map_err(self.f.clone())
}
}
/// A [`Layer`] that produces a [`MapErr`] service.
///
/// [`Layer`]: tower_layer::Layer
#[derive(Debug)]
pub struct MapErrLayer<F> {
f: F,
}
impl<F> MapErrLayer<F> {
/// Creates a new [`MapErrLayer`].
pub fn new(f: F) -> Self {
MapErrLayer { f }
}
}
impl<S, F> Layer<S> for MapErrLayer<F>
where
F: Clone,
{
type Service = MapErr<S, F>;
fn layer(&self, inner: S) -> Self::Service {
MapErr {
f: self.f.clone(),
inner,
}
}
}

67
tower/src/util/map_ok.rs Normal file
View File

@ -0,0 +1,67 @@
use futures_util::{future::MapOk as MapOkFut, TryFutureExt};
use std::task::{Context, Poll};
use tower_layer::Layer;
use tower_service::Service;
/// Service returned by the [`map_ok`] combinator.
///
/// [`map_ok`]: crate::util::ServiceExt::map_ok
#[derive(Debug)]
pub struct MapOk<S, F> {
inner: S,
f: F,
}
impl<S, F> MapOk<S, F> {
/// Creates a new `MapOk` service.
pub fn new(inner: S, f: F) -> Self {
MapOk { f, inner }
}
}
impl<S, F, Request, Response> Service<Request> for MapOk<S, F>
where
S: Service<Request>,
F: FnOnce(S::Response) -> Response + Clone,
{
type Response = Response;
type Error = S::Error;
type Future = MapOkFut<S::Future, F>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, request: Request) -> Self::Future {
self.inner.call(request).map_ok(self.f.clone())
}
}
/// A [`Layer`] that produces a [`MapOk`] service.
///
/// [`Layer`]: tower_layer::Layer
#[derive(Debug)]
pub struct MapOkLayer<F> {
f: F,
}
impl<F> MapOkLayer<F> {
/// Creates a new [`MapOkLayer`] layer.
pub fn new(f: F) -> Self {
MapOkLayer { f }
}
}
impl<S, F> Layer<S> for MapOkLayer<F>
where
F: Clone,
{
type Service = MapOk<S, F>;
fn layer(&self, inner: S) -> Self::Service {
MapOk {
f: self.f.clone(),
inner,
}
}
}

View File

@ -4,19 +4,27 @@ mod boxed;
mod call_all;
mod either;
mod map;
mod map_err;
mod map_ok;
mod oneshot;
mod optional;
mod ready;
mod service_fn;
mod try_with;
mod with;
pub use self::{
boxed::{BoxService, UnsyncBoxService},
either::Either,
map::{MapRequest, MapRequestLayer, MapResponse, MapResponseLayer},
map_err::{MapErr, MapErrLayer},
map_ok::{MapOk, MapOkLayer},
oneshot::Oneshot,
optional::Optional,
ready::{Ready, ReadyAnd, ReadyOneshot},
service_fn::{service_fn, ServiceFn},
try_with::{TryWith, TryWithLayer},
with::{With, WithLayer},
};
pub use self::call_all::{CallAll, CallAllUnordered};
@ -82,6 +90,244 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
{
CallAll::new(self, reqs)
}
/// Maps this service's response value to a different value. This does not
/// alter the behaviour of the [`poll_ready`] method.
///
/// This method can be used to change the [`Response`] type of the service
/// into a different type. It is similar to the [`Result::map`]
/// method. You can use this method to chain along a computation once the
/// services response has been resolved.
///
/// [`Response`]: crate::Service::Response
/// [`poll_ready`]: crate::Service::poll_ready
///
/// # Example
/// ```
/// # use std::task::{Poll, Context};
/// # use tower::{Service, ServiceExt};
/// #
/// # struct DatabaseService;
/// # impl DatabaseService {
/// # fn new(address: &str) -> Self {
/// # DatabaseService
/// # }
/// # }
/// #
/// # struct Record {
/// # pub name: String,
/// # pub age: u16
/// # }
/// #
/// # impl Service<u32> for DatabaseService {
/// # type Response = Record;
/// # type Error = u8;
/// # type Future = futures_util::future::Ready<Result<Record, u8>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
/// # }
/// # }
/// #
/// # fn main() {
/// # async {
/// // A service returning Result<Record, _>
/// let service = DatabaseService::new("127.0.0.1:8080");
///
/// // Map the response into a new response
/// let mut new_service = service.map_ok(|record| record.name);
///
/// // Call the new service
/// let id = 13;
/// let name = new_service.call(id).await.unwrap();
/// # };
/// # }
/// ```
fn map_ok<F, Response>(self, f: F) -> MapOk<Self, F>
where
Self: Sized,
F: FnOnce(Self::Response) -> Response + Clone,
{
MapOk::new(self, f)
}
/// Maps this services's error value to a different value. This does not
/// alter the behaviour of the [`poll_ready`] method.
///
/// This method can be used to change the [`Error`] type of the service
/// into a different type. It is similar to the [`Result::map_err`] method.
///
/// [`Error`]: crate::Service::Error
/// [`poll_ready`]: crate::Service::poll_ready
///
/// # Example
/// ```
/// # use std::task::{Poll, Context};
/// # use tower::{Service, ServiceExt};
/// #
/// # struct DatabaseService;
/// # impl DatabaseService {
/// # fn new(address: &str) -> Self {
/// # DatabaseService
/// # }
/// # }
/// #
/// # struct Error {
/// # pub code: u32,
/// # pub message: String
/// # }
/// #
/// # impl Service<u32> for DatabaseService {
/// # type Response = String;
/// # type Error = Error;
/// # type Future = futures_util::future::Ready<Result<String, Error>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(String::new()))
/// # }
/// # }
/// #
/// # fn main() {
/// # async {
/// // A service returning Result<_, Error>
/// let service = DatabaseService::new("127.0.0.1:8080");
///
/// // Map the error to a new error
/// let mut new_service = service.map_err(|err| err.code);
///
/// // Call the new service
/// let id = 13;
/// let code = new_service.call(id).await.unwrap_err();
/// # };
/// # }
/// ```
fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
where
Self: Sized,
F: FnOnce(Self::Error) -> Error + Clone,
{
MapErr::new(self, f)
}
/// Composes a function *in front of* the service.
///
/// This adapter produces a new service that passes each value through the
/// given function `f` before sending it to `self`.
///
/// # Example
/// ```
/// # use std::convert::TryFrom;
/// # use std::task::{Poll, Context};
/// # use tower::{Service, ServiceExt};
/// #
/// # struct DatabaseService;
/// # impl DatabaseService {
/// # fn new(address: &str) -> Self {
/// # DatabaseService
/// # }
/// # }
/// #
/// # impl Service<String> for DatabaseService {
/// # type Response = String;
/// # type Error = u8;
/// # type Future = futures_util::future::Ready<Result<String, u8>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: String) -> Self::Future {
/// # futures_util::future::ready(Ok(String::new()))
/// # }
/// # }
/// #
/// # fn main() {
/// # async {
/// // A service taking a String as a request
/// let service = DatabaseService::new("127.0.0.1:8080");
///
/// // Map the request to a new request
/// let mut new_service = service.with(|id: u32| id.to_string());
///
/// // Call the new service
/// let id = 13;
/// let response = new_service.call(id).await;
/// # };
/// # }
/// ```
fn with<F, NewRequest>(self, f: F) -> With<Self, F>
where
Self: Sized,
F: FnOnce(NewRequest) -> Request + Clone,
{
With::new(self, f)
}
/// Composes a fallible function *in front of* the service.
///
/// This adapter produces a new service that passes each value through the
/// given function `f` before sending it to `self`.
///
/// # Example
/// ```
/// # use std::convert::TryFrom;
/// # use std::task::{Poll, Context};
/// # use tower::{Service, ServiceExt};
/// #
/// # struct DatabaseService;
/// # impl DatabaseService {
/// # fn new(address: &str) -> Self {
/// # DatabaseService
/// # }
/// # }
/// #
/// # enum DbError {
/// # Parse(std::num::ParseIntError)
/// # }
/// #
/// # impl Service<u32> for DatabaseService {
/// # type Response = String;
/// # type Error = DbError;
/// # type Future = futures_util::future::Ready<Result<String, DbError>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(String::new()))
/// # }
/// # }
/// #
/// # fn main() {
/// # async {
/// // A service taking a u32 as a request and returning Result<_, DbError>
/// let service = DatabaseService::new("127.0.0.1:8080");
///
/// // Fallibly map the request to a new request
/// let mut new_service = service.try_with(|id_str: &str| id_str.parse().map_err(DbError::Parse));
///
/// // Call the new service
/// let id = "13";
/// let response = new_service.call(id).await;
/// # };
/// # }
/// ```
fn try_with<F, NewRequest>(self, f: F) -> TryWith<Self, F>
where
Self: Sized,
F: FnOnce(NewRequest) -> Result<Request, Self::Error> + Clone,
{
TryWith::new(self, f)
}
}
impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {}

View File

@ -0,0 +1,70 @@
use futures_util::future::{ready, Either, Ready};
use std::task::{Context, Poll};
use tower_layer::Layer;
use tower_service::Service;
/// Service returned by the [`try_with`] combinator.
///
/// [`try_with`]: crate::util::ServiceExt::try_with
#[derive(Debug)]
pub struct TryWith<S, F> {
inner: S,
f: F,
}
impl<S, F> TryWith<S, F> {
/// Creates a new [`TryWith`] service.
pub fn new(inner: S, f: F) -> Self {
TryWith { inner, f }
}
}
impl<S, F, R1, R2> Service<R1> for TryWith<S, F>
where
S: Service<R2>,
F: FnOnce(R1) -> Result<R2, S::Error> + Clone,
{
type Response = S::Response;
type Error = S::Error;
type Future = Either<S::Future, Ready<Result<S::Response, S::Error>>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, request: R1) -> Self::Future {
match (self.f.clone())(request) {
Ok(ok) => Either::Left(self.inner.call(ok)),
Err(err) => Either::Right(ready(Err(err))),
}
}
}
/// A [`Layer`] that produces a [`TryWith`] service.
///
/// [`Layer`]: tower_layer::Layer
#[derive(Debug)]
pub struct TryWithLayer<F> {
f: F,
}
impl<F> TryWithLayer<F> {
/// Creates a new [`TryWithLayer`].
pub fn new(f: F) -> Self {
TryWithLayer { f }
}
}
impl<S, F> Layer<S> for TryWithLayer<F>
where
F: Clone,
{
type Service = TryWith<S, F>;
fn layer(&self, inner: S) -> Self::Service {
TryWith {
f: self.f.clone(),
inner,
}
}
}

66
tower/src/util/with.rs Normal file
View File

@ -0,0 +1,66 @@
use std::task::{Context, Poll};
use tower_layer::Layer;
use tower_service::Service;
/// Service returned by the [`with`] combinator.
///
/// [`with`]: crate::util::ServiceExt::with
#[derive(Debug)]
pub struct With<S, F> {
inner: S,
f: F,
}
impl<S, F> With<S, F> {
/// Creates a new [`With`] service.
pub fn new(inner: S, f: F) -> Self {
With { inner, f }
}
}
impl<S, F, R1, R2> Service<R1> for With<S, F>
where
S: Service<R2>,
F: FnOnce(R1) -> R2 + Clone,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, request: R1) -> S::Future {
self.inner.call((self.f.clone())(request))
}
}
/// A [`Layer`] that produces a [`With`] service.
///
/// [`Layer`]: tower_layer::Layer
#[derive(Debug)]
pub struct WithLayer<F> {
f: F,
}
impl<F> WithLayer<F> {
/// Creates a new [`WithLayer`].
pub fn new(f: F) -> Self {
WithLayer { f }
}
}
impl<S, F> Layer<S> for WithLayer<F>
where
F: Clone,
{
type Service = With<S, F>;
fn layer(&self, inner: S) -> Self::Service {
With {
f: self.f.clone(),
inner,
}
}
}