diff --git a/tower/src/util/map_err.rs b/tower/src/util/map_err.rs new file mode 100644 index 0000000..038dd6c --- /dev/null +++ b/tower/src/util/map_err.rs @@ -0,0 +1,67 @@ +use futures_util::{future::MapErr as MapErrFut, TryFutureExt}; +use std::task::{Context, Poll}; +use tower_layer::Layer; +use tower_service::Service; + +/// Service returned by the [`map_err`] combinator. +/// +/// [`map_err`]: crate::util::ServiceExt::map_err +#[derive(Debug)] +pub struct MapErr { + inner: S, + f: F, +} + +impl MapErr { + /// Creates a new [`MapErr`] service. + pub fn new(inner: S, f: F) -> Self { + MapErr { f, inner } + } +} + +impl Service for MapErr +where + S: Service, + F: FnOnce(S::Error) -> Error + Clone, +{ + type Response = S::Response; + type Error = Error; + type Future = MapErrFut; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(self.f.clone()) + } + + fn call(&mut self, request: Request) -> Self::Future { + self.inner.call(request).map_err(self.f.clone()) + } +} + +/// A [`Layer`] that produces a [`MapErr`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug)] +pub struct MapErrLayer { + f: F, +} + +impl MapErrLayer { + /// Creates a new [`MapErrLayer`]. + pub fn new(f: F) -> Self { + MapErrLayer { f } + } +} + +impl Layer for MapErrLayer +where + F: Clone, +{ + type Service = MapErr; + + fn layer(&self, inner: S) -> Self::Service { + MapErr { + f: self.f.clone(), + inner, + } + } +} diff --git a/tower/src/util/map_ok.rs b/tower/src/util/map_ok.rs new file mode 100644 index 0000000..a021fa5 --- /dev/null +++ b/tower/src/util/map_ok.rs @@ -0,0 +1,67 @@ +use futures_util::{future::MapOk as MapOkFut, TryFutureExt}; +use std::task::{Context, Poll}; +use tower_layer::Layer; +use tower_service::Service; + +/// Service returned by the [`map_ok`] combinator. +/// +/// [`map_ok`]: crate::util::ServiceExt::map_ok +#[derive(Debug)] +pub struct MapOk { + inner: S, + f: F, +} + +impl MapOk { + /// Creates a new `MapOk` service. + pub fn new(inner: S, f: F) -> Self { + MapOk { f, inner } + } +} + +impl Service for MapOk +where + S: Service, + F: FnOnce(S::Response) -> Response + Clone, +{ + type Response = Response; + type Error = S::Error; + type Future = MapOkFut; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + self.inner.call(request).map_ok(self.f.clone()) + } +} + +/// A [`Layer`] that produces a [`MapOk`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug)] +pub struct MapOkLayer { + f: F, +} + +impl MapOkLayer { + /// Creates a new [`MapOkLayer`] layer. + pub fn new(f: F) -> Self { + MapOkLayer { f } + } +} + +impl Layer for MapOkLayer +where + F: Clone, +{ + type Service = MapOk; + + fn layer(&self, inner: S) -> Self::Service { + MapOk { + f: self.f.clone(), + inner, + } + } +} diff --git a/tower/src/util/mod.rs b/tower/src/util/mod.rs index b33d25d..7dadec6 100644 --- a/tower/src/util/mod.rs +++ b/tower/src/util/mod.rs @@ -4,19 +4,27 @@ mod boxed; mod call_all; mod either; mod map; +mod map_err; +mod map_ok; mod oneshot; mod optional; mod ready; mod service_fn; +mod try_with; +mod with; pub use self::{ boxed::{BoxService, UnsyncBoxService}, either::Either, map::{MapRequest, MapRequestLayer, MapResponse, MapResponseLayer}, + map_err::{MapErr, MapErrLayer}, + map_ok::{MapOk, MapOkLayer}, oneshot::Oneshot, optional::Optional, ready::{Ready, ReadyAnd, ReadyOneshot}, service_fn::{service_fn, ServiceFn}, + try_with::{TryWith, TryWithLayer}, + with::{With, WithLayer}, }; pub use self::call_all::{CallAll, CallAllUnordered}; @@ -82,6 +90,244 @@ pub trait ServiceExt: tower_service::Service { { CallAll::new(self, reqs) } + + /// Maps this service's response value to a different value. This does not + /// alter the behaviour of the [`poll_ready`] method. + /// + /// This method can be used to change the [`Response`] type of the service + /// into a different type. It is similar to the [`Result::map`] + /// method. You can use this method to chain along a computation once the + /// services response has been resolved. + /// + /// [`Response`]: crate::Service::Response + /// [`poll_ready`]: crate::Service::poll_ready + /// + /// # Example + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Record { + /// # pub name: String, + /// # pub age: u16 + /// # } + /// # + /// # impl Service for DatabaseService { + /// # type Response = Record; + /// # type Error = u8; + /// # type Future = futures_util::future::Ready>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 })) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Map the response into a new response + /// let mut new_service = service.map_ok(|record| record.name); + /// + /// // Call the new service + /// let id = 13; + /// let name = new_service.call(id).await.unwrap(); + /// # }; + /// # } + /// ``` + fn map_ok(self, f: F) -> MapOk + where + Self: Sized, + F: FnOnce(Self::Response) -> Response + Clone, + { + MapOk::new(self, f) + } + + /// Maps this services's error value to a different value. This does not + /// alter the behaviour of the [`poll_ready`] method. + /// + /// This method can be used to change the [`Error`] type of the service + /// into a different type. It is similar to the [`Result::map_err`] method. + /// + /// [`Error`]: crate::Service::Error + /// [`poll_ready`]: crate::Service::poll_ready + /// + /// # Example + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Error { + /// # pub code: u32, + /// # pub message: String + /// # } + /// # + /// # impl Service for DatabaseService { + /// # type Response = String; + /// # type Error = Error; + /// # type Future = futures_util::future::Ready>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(String::new())) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result<_, Error> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Map the error to a new error + /// let mut new_service = service.map_err(|err| err.code); + /// + /// // Call the new service + /// let id = 13; + /// let code = new_service.call(id).await.unwrap_err(); + /// # }; + /// # } + /// ``` + fn map_err(self, f: F) -> MapErr + where + Self: Sized, + F: FnOnce(Self::Error) -> Error + Clone, + { + MapErr::new(self, f) + } + + /// Composes a function *in front of* the service. + /// + /// This adapter produces a new service that passes each value through the + /// given function `f` before sending it to `self`. + /// + /// # Example + /// ``` + /// # use std::convert::TryFrom; + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # impl Service for DatabaseService { + /// # type Response = String; + /// # type Error = u8; + /// # type Future = futures_util::future::Ready>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: String) -> Self::Future { + /// # futures_util::future::ready(Ok(String::new())) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service taking a String as a request + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Map the request to a new request + /// let mut new_service = service.with(|id: u32| id.to_string()); + /// + /// // Call the new service + /// let id = 13; + /// let response = new_service.call(id).await; + /// # }; + /// # } + /// ``` + fn with(self, f: F) -> With + where + Self: Sized, + F: FnOnce(NewRequest) -> Request + Clone, + { + With::new(self, f) + } + + /// Composes a fallible function *in front of* the service. + /// + /// This adapter produces a new service that passes each value through the + /// given function `f` before sending it to `self`. + /// + /// # Example + /// ``` + /// # use std::convert::TryFrom; + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # enum DbError { + /// # Parse(std::num::ParseIntError) + /// # } + /// # + /// # impl Service for DatabaseService { + /// # type Response = String; + /// # type Error = DbError; + /// # type Future = futures_util::future::Ready>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(String::new())) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service taking a u32 as a request and returning Result<_, DbError> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Fallibly map the request to a new request + /// let mut new_service = service.try_with(|id_str: &str| id_str.parse().map_err(DbError::Parse)); + /// + /// // Call the new service + /// let id = "13"; + /// let response = new_service.call(id).await; + /// # }; + /// # } + /// ``` + fn try_with(self, f: F) -> TryWith + where + Self: Sized, + F: FnOnce(NewRequest) -> Result + Clone, + { + TryWith::new(self, f) + } } impl ServiceExt for T where T: tower_service::Service {} diff --git a/tower/src/util/try_with.rs b/tower/src/util/try_with.rs new file mode 100644 index 0000000..ba8468d --- /dev/null +++ b/tower/src/util/try_with.rs @@ -0,0 +1,70 @@ +use futures_util::future::{ready, Either, Ready}; +use std::task::{Context, Poll}; +use tower_layer::Layer; +use tower_service::Service; + +/// Service returned by the [`try_with`] combinator. +/// +/// [`try_with`]: crate::util::ServiceExt::try_with +#[derive(Debug)] +pub struct TryWith { + inner: S, + f: F, +} + +impl TryWith { + /// Creates a new [`TryWith`] service. + pub fn new(inner: S, f: F) -> Self { + TryWith { inner, f } + } +} + +impl Service for TryWith +where + S: Service, + F: FnOnce(R1) -> Result + Clone, +{ + type Response = S::Response; + type Error = S::Error; + type Future = Either>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: R1) -> Self::Future { + match (self.f.clone())(request) { + Ok(ok) => Either::Left(self.inner.call(ok)), + Err(err) => Either::Right(ready(Err(err))), + } + } +} + +/// A [`Layer`] that produces a [`TryWith`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug)] +pub struct TryWithLayer { + f: F, +} + +impl TryWithLayer { + /// Creates a new [`TryWithLayer`]. + pub fn new(f: F) -> Self { + TryWithLayer { f } + } +} + +impl Layer for TryWithLayer +where + F: Clone, +{ + type Service = TryWith; + + fn layer(&self, inner: S) -> Self::Service { + TryWith { + f: self.f.clone(), + inner, + } + } +} diff --git a/tower/src/util/with.rs b/tower/src/util/with.rs new file mode 100644 index 0000000..cd0c508 --- /dev/null +++ b/tower/src/util/with.rs @@ -0,0 +1,66 @@ +use std::task::{Context, Poll}; +use tower_layer::Layer; +use tower_service::Service; + +/// Service returned by the [`with`] combinator. +/// +/// [`with`]: crate::util::ServiceExt::with +#[derive(Debug)] +pub struct With { + inner: S, + f: F, +} + +impl With { + /// Creates a new [`With`] service. + pub fn new(inner: S, f: F) -> Self { + With { inner, f } + } +} + +impl Service for With +where + S: Service, + F: FnOnce(R1) -> R2 + Clone, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: R1) -> S::Future { + self.inner.call((self.f.clone())(request)) + } +} + +/// A [`Layer`] that produces a [`With`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug)] +pub struct WithLayer { + f: F, +} + +impl WithLayer { + /// Creates a new [`WithLayer`]. + pub fn new(f: F) -> Self { + WithLayer { f } + } +} + +impl Layer for WithLayer +where + F: Clone, +{ + type Service = With; + + fn layer(&self, inner: S) -> Self::Service { + With { + f: self.f.clone(), + inner, + } + } +}