Add combinators for working with Services (#104)
This commit is contained in:
parent
4ffe24f485
commit
c29f7e97ba
|
@ -0,0 +1,164 @@
|
|||
use futures::{Future, Poll};
|
||||
use tower_service::Service;
|
||||
|
||||
/// Service for the `and_then` combinator, chaining a computation onto the end of
|
||||
/// another service which completes successfully.
|
||||
///
|
||||
/// This is created by the `ServiceExt::and_then` method.
|
||||
pub struct AndThen<A, B> {
|
||||
a: A,
|
||||
b: B,
|
||||
}
|
||||
|
||||
impl<A, B> AndThen<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = A::Response, Error = A::Error> + Clone,
|
||||
{
|
||||
/// Create new `AndThen` combinator
|
||||
pub fn new(a: A, b: B) -> AndThen<A, B> {
|
||||
AndThen { a, b }
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Clone for AndThen<A, B>
|
||||
where
|
||||
A: Service + Clone,
|
||||
B: Service<Request = A::Response, Error = A::Error> + Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
AndThen {
|
||||
a: self.a.clone(),
|
||||
b: self.b.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Service for AndThen<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = A::Response, Error = A::Error> + Clone,
|
||||
{
|
||||
type Request = A::Request;
|
||||
type Response = B::Response;
|
||||
type Error = B::Error;
|
||||
type Future = AndThenFuture<A, B>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
let _ = try_ready!(self.a.poll_ready());
|
||||
self.b.poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
AndThenFuture::new(self.a.call(req), self.b.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AndThenFuture<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = A::Response, Error = A::Error>,
|
||||
{
|
||||
b: B,
|
||||
fut_b: Option<B::Future>,
|
||||
fut_a: A::Future,
|
||||
}
|
||||
|
||||
impl<A, B> AndThenFuture<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = A::Response, Error = A::Error>,
|
||||
{
|
||||
fn new(fut_a: A::Future, b: B) -> Self {
|
||||
AndThenFuture {
|
||||
b,
|
||||
fut_a,
|
||||
fut_b: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Future for AndThenFuture<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = A::Response, Error = A::Error>,
|
||||
{
|
||||
type Item = B::Response;
|
||||
type Error = B::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Some(ref mut fut) = self.fut_b {
|
||||
return fut.poll();
|
||||
}
|
||||
|
||||
let resp = try_ready!(self.fut_a.poll());
|
||||
self.fut_b = Some(self.b.call(resp));
|
||||
self.poll()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::future::{ok, FutureResult};
|
||||
use futures::{Async, Poll};
|
||||
use std::cell::Cell;
|
||||
use std::rc::Rc;
|
||||
|
||||
use super::*;
|
||||
use ServiceExt;
|
||||
|
||||
struct Srv1(Rc<Cell<usize>>);
|
||||
impl Service for Srv1 {
|
||||
type Request = &'static str;
|
||||
type Response = &'static str;
|
||||
type Error = ();
|
||||
type Future = FutureResult<Self::Response, ()>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.0.set(self.0.get() + 1);
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
ok(req)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Srv2(Rc<Cell<usize>>);
|
||||
|
||||
impl Service for Srv2 {
|
||||
type Request = &'static str;
|
||||
type Response = (&'static str, &'static str);
|
||||
type Error = ();
|
||||
type Future = FutureResult<Self::Response, ()>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.0.set(self.0.get() + 1);
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
ok((req, "srv2"))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_poll_ready() {
|
||||
let cnt = Rc::new(Cell::new(0));
|
||||
let mut srv = Srv1(cnt.clone()).and_then(Srv2(cnt.clone()));
|
||||
let res = srv.poll_ready();
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), Async::Ready(()));
|
||||
assert_eq!(cnt.get(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_call() {
|
||||
let cnt = Rc::new(Cell::new(0));
|
||||
let mut srv = Srv1(cnt.clone()).and_then(Srv2(cnt));
|
||||
let res = srv.call("srv1").poll();
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), Async::Ready(("srv1", "srv2")));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
use std::marker::PhantomData;
|
||||
|
||||
use futures::{Future, IntoFuture, Poll};
|
||||
use tower_service::Service;
|
||||
|
||||
/// `Apply` service combinator
|
||||
pub struct Apply<T, F, R, Req> {
|
||||
service: T,
|
||||
f: F,
|
||||
_r: PhantomData<Fn(Req) -> R>,
|
||||
}
|
||||
|
||||
impl<T, F, R, Req> Apply<T, F, R, Req>
|
||||
where
|
||||
T: Service<Error = R::Error> + Clone,
|
||||
F: Fn(Req, T) -> R,
|
||||
R: IntoFuture,
|
||||
{
|
||||
/// Create new `Apply` combinator
|
||||
pub fn new(f: F, service: T) -> Self {
|
||||
Self {
|
||||
service,
|
||||
f,
|
||||
_r: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, F, R, Req> Clone for Apply<T, F, R, Req>
|
||||
where
|
||||
T: Service<Error = R::Error> + Clone,
|
||||
F: Fn(Req, T) -> R + Clone,
|
||||
R: IntoFuture,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Apply {
|
||||
service: self.service.clone(),
|
||||
f: self.f.clone(),
|
||||
_r: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, F, R, Req> Service for Apply<T, F, R, Req>
|
||||
where
|
||||
T: Service<Error = R::Error> + Clone,
|
||||
F: Fn(Req, T) -> R,
|
||||
R: IntoFuture,
|
||||
{
|
||||
type Request = Req;
|
||||
type Response = <R::Future as Future>::Item;
|
||||
type Error = <R::Future as Future>::Error;
|
||||
type Future = R::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.service.poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
let service = self.service.clone();
|
||||
(self.f)(req, service).into_future()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::future::{ok, FutureResult};
|
||||
use futures::{Async, Future, Poll};
|
||||
use tower_service::Service;
|
||||
|
||||
use ext::ServiceExt;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Srv;
|
||||
impl Service for Srv {
|
||||
type Request = ();
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = FutureResult<(), ()>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_call() {
|
||||
let mut srv =
|
||||
Srv.apply(|req: &'static str, mut srv| srv.call(()).map(move |res| (req, res)));
|
||||
let res = srv.call("srv").poll();
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), Async::Ready(("srv", ())));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
use std::marker::PhantomData;
|
||||
|
||||
use futures::{Future, Poll};
|
||||
use tower_service::Service;
|
||||
|
||||
/// Service for the `from_err` combinator, changing the error type of a service.
|
||||
///
|
||||
/// This is created by the `ServiceExt::from_err` method.
|
||||
pub struct FromErr<A, E>
|
||||
where
|
||||
A: Service,
|
||||
{
|
||||
service: A,
|
||||
_e: PhantomData<E>,
|
||||
}
|
||||
|
||||
impl<A: Service, E: From<A::Error>> FromErr<A, E> {
|
||||
pub(crate) fn new(service: A) -> Self {
|
||||
FromErr {
|
||||
service,
|
||||
_e: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, E> Clone for FromErr<A, E>
|
||||
where
|
||||
A: Service + Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
FromErr {
|
||||
service: self.service.clone(),
|
||||
_e: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, E> Service for FromErr<A, E>
|
||||
where
|
||||
A: Service,
|
||||
E: From<A::Error>,
|
||||
{
|
||||
type Request = A::Request;
|
||||
type Response = A::Response;
|
||||
type Error = E;
|
||||
type Future = FromErrFuture<A, E>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), E> {
|
||||
Ok(self.service.poll_ready().map_err(E::from)?)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
FromErrFuture {
|
||||
fut: self.service.call(req),
|
||||
f: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FromErrFuture<A: Service, E> {
|
||||
fut: A::Future,
|
||||
f: PhantomData<E>,
|
||||
}
|
||||
|
||||
impl<A, E> Future for FromErrFuture<A, E>
|
||||
where
|
||||
A: Service,
|
||||
E: From<A::Error>,
|
||||
{
|
||||
type Item = A::Response;
|
||||
type Error = E;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
self.fut.poll().map_err(E::from)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::future::{err, FutureResult};
|
||||
|
||||
use super::*;
|
||||
use ServiceExt;
|
||||
|
||||
struct Srv;
|
||||
|
||||
impl Service for Srv {
|
||||
type Request = ();
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = FutureResult<(), ()>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Err(())
|
||||
}
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
err(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct Error;
|
||||
|
||||
impl From<()> for Error {
|
||||
fn from(_: ()) -> Self {
|
||||
Error
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_poll_ready() {
|
||||
let mut srv = Srv.from_err::<Error>();
|
||||
let res = srv.poll_ready();
|
||||
assert!(res.is_err());
|
||||
assert_eq!(res.err().unwrap(), Error);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_call() {
|
||||
let mut srv = Srv.from_err::<Error>();
|
||||
let res = srv.call(()).poll();
|
||||
assert!(res.is_err());
|
||||
assert_eq!(res.err().unwrap(), Error);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,131 @@
|
|||
use futures::{Async, Future, Poll};
|
||||
use tower_service::Service;
|
||||
|
||||
/// Service for the `map` combinator, changing the type of a service's response.
|
||||
///
|
||||
/// This is created by the `ServiceExt::map` method.
|
||||
pub struct Map<T, F, R>
|
||||
where
|
||||
T: Service,
|
||||
F: Fn(T::Response) -> R + Clone,
|
||||
{
|
||||
service: T,
|
||||
f: F,
|
||||
}
|
||||
|
||||
impl<T, F, R> Map<T, F, R>
|
||||
where
|
||||
T: Service,
|
||||
F: Fn(T::Response) -> R + Clone,
|
||||
{
|
||||
/// Create new `Map` combinator
|
||||
pub fn new(service: T, f: F) -> Self {
|
||||
Map { service, f }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, F, R> Clone for Map<T, F, R>
|
||||
where
|
||||
T: Service + Clone,
|
||||
F: Fn(T::Response) -> R + Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Map {
|
||||
service: self.service.clone(),
|
||||
f: self.f.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, F, R> Service for Map<T, F, R>
|
||||
where
|
||||
T: Service,
|
||||
F: Fn(T::Response) -> R + Clone,
|
||||
{
|
||||
type Request = T::Request;
|
||||
type Response = R;
|
||||
type Error = T::Error;
|
||||
type Future = MapFuture<T, F, R>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.service.poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
MapFuture::new(self.service.call(req), self.f.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MapFuture<T, F, R>
|
||||
where
|
||||
T: Service,
|
||||
F: Fn(T::Response) -> R,
|
||||
{
|
||||
f: F,
|
||||
fut: T::Future,
|
||||
}
|
||||
|
||||
impl<T, F, R> MapFuture<T, F, R>
|
||||
where
|
||||
T: Service,
|
||||
F: Fn(T::Response) -> R,
|
||||
{
|
||||
fn new(fut: T::Future, f: F) -> Self {
|
||||
MapFuture { f, fut }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, F, R> Future for MapFuture<T, F, R>
|
||||
where
|
||||
T: Service,
|
||||
F: Fn(T::Response) -> R,
|
||||
{
|
||||
type Item = R;
|
||||
type Error = T::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let resp = try_ready!(self.fut.poll());
|
||||
Ok(Async::Ready((self.f)(resp)))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::future::{ok, FutureResult};
|
||||
|
||||
use super::*;
|
||||
use ServiceExt;
|
||||
|
||||
struct Srv;
|
||||
|
||||
impl Service for Srv {
|
||||
type Request = ();
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = FutureResult<(), ()>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_poll_ready() {
|
||||
let mut srv = Srv.map(|_| "ok");
|
||||
let res = srv.poll_ready();
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), Async::Ready(()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_call() {
|
||||
let mut srv = Srv.map(|_| "ok");
|
||||
let res = srv.call(()).poll();
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), Async::Ready("ok"));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,130 @@
|
|||
use futures::{Future, Poll};
|
||||
use tower_service::Service;
|
||||
|
||||
/// Service for the `map_err` combinator, changing the type of a service's error.
|
||||
///
|
||||
/// This is created by the `ServiceExt::map_err` method.
|
||||
pub struct MapErr<T, F, E>
|
||||
where
|
||||
T: Service,
|
||||
F: Fn(T::Error) -> E + Clone,
|
||||
{
|
||||
service: T,
|
||||
f: F,
|
||||
}
|
||||
|
||||
impl<T, F, E> MapErr<T, F, E>
|
||||
where
|
||||
T: Service,
|
||||
F: Fn(T::Error) -> E + Clone,
|
||||
{
|
||||
/// Create new `MapErr` combinator
|
||||
pub fn new(service: T, f: F) -> Self {
|
||||
MapErr { service, f }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, F, E> Clone for MapErr<T, F, E>
|
||||
where
|
||||
T: Service + Clone,
|
||||
F: Fn(T::Error) -> E + Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
MapErr {
|
||||
service: self.service.clone(),
|
||||
f: self.f.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, F, E> Service for MapErr<T, F, E>
|
||||
where
|
||||
T: Service,
|
||||
F: Fn(T::Error) -> E + Clone,
|
||||
{
|
||||
type Request = T::Request;
|
||||
type Response = T::Response;
|
||||
type Error = E;
|
||||
type Future = MapErrFuture<T, F, E>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.service.poll_ready().map_err(&self.f)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
MapErrFuture::new(self.service.call(req), self.f.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MapErrFuture<T, F, E>
|
||||
where
|
||||
T: Service,
|
||||
F: Fn(T::Error) -> E,
|
||||
{
|
||||
f: F,
|
||||
fut: T::Future,
|
||||
}
|
||||
|
||||
impl<T, F, E> MapErrFuture<T, F, E>
|
||||
where
|
||||
T: Service,
|
||||
F: Fn(T::Error) -> E,
|
||||
{
|
||||
fn new(fut: T::Future, f: F) -> Self {
|
||||
MapErrFuture { f, fut }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, F, E> Future for MapErrFuture<T, F, E>
|
||||
where
|
||||
T: Service,
|
||||
F: Fn(T::Error) -> E,
|
||||
{
|
||||
type Item = T::Response;
|
||||
type Error = E;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
self.fut.poll().map_err(&self.f)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::future::{err, FutureResult};
|
||||
|
||||
use super::*;
|
||||
use ServiceExt;
|
||||
|
||||
struct Srv;
|
||||
|
||||
impl Service for Srv {
|
||||
type Request = ();
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = FutureResult<(), ()>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Err(())
|
||||
}
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
err(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_poll_ready() {
|
||||
let mut srv = Srv.map_err(|_| "error");
|
||||
let res = srv.poll_ready();
|
||||
assert!(res.is_err());
|
||||
assert_eq!(res.err().unwrap(), "error");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_call() {
|
||||
let mut srv = Srv.map_err(|_| "error");
|
||||
let res = srv.call(()).poll();
|
||||
assert!(res.is_err());
|
||||
assert_eq!(res.err().unwrap(), "error");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
//! Combinators for working with `Service`s
|
||||
|
||||
use futures::IntoFuture;
|
||||
use tower_service::Service;
|
||||
|
||||
mod and_then;
|
||||
mod apply;
|
||||
mod from_err;
|
||||
mod map;
|
||||
mod map_err;
|
||||
mod then;
|
||||
|
||||
pub use self::and_then::AndThen;
|
||||
pub use self::apply::Apply;
|
||||
pub use self::from_err::FromErr;
|
||||
pub use self::map::Map;
|
||||
pub use self::map_err::MapErr;
|
||||
pub use self::then::Then;
|
||||
|
||||
impl<T: ?Sized> ServiceExt for T where T: Service {}
|
||||
|
||||
/// An extension trait for `Service`s that provides a variety of convenient
|
||||
/// adapters
|
||||
pub trait ServiceExt: Service {
|
||||
fn apply<F, R, Req>(self, f: F) -> Apply<Self, F, R, Req>
|
||||
where
|
||||
Self: Clone + Sized,
|
||||
F: Fn(Req, Self) -> R,
|
||||
R: IntoFuture<Error = Self::Error>,
|
||||
{
|
||||
Apply::new(f, self)
|
||||
}
|
||||
|
||||
/// Call another service after call to this one has resolved successfully.
|
||||
///
|
||||
/// This function can be used to chain two services together and ensure that
|
||||
/// the second service isn't called until call to the fist service have finished.
|
||||
/// Result of the call to the first service is used as an input parameter
|
||||
/// for the second service's call.
|
||||
///
|
||||
/// Note that this function consumes the receiving service and returns a
|
||||
/// wrapped version of it.
|
||||
fn and_then<B>(self, service: B) -> AndThen<Self, B>
|
||||
where
|
||||
Self: Sized,
|
||||
B: Service<Request = Self::Response, Error = Self::Error> + Clone,
|
||||
{
|
||||
AndThen::new(self, service)
|
||||
}
|
||||
|
||||
/// Map this service's error to any error implementing `From` for
|
||||
/// this service`s `Error`.
|
||||
///
|
||||
/// Note that this function consumes the receiving service and returns a
|
||||
/// wrapped version of it.
|
||||
fn from_err<E>(self) -> FromErr<Self, E>
|
||||
where
|
||||
Self: Sized,
|
||||
E: From<Self::Error>,
|
||||
{
|
||||
FromErr::new(self)
|
||||
}
|
||||
|
||||
/// Chain on a computation for when a call to the service finished,
|
||||
/// passing the result of the call to the next service `B`.
|
||||
///
|
||||
/// Note that this function consumes the receiving future and returns a
|
||||
/// wrapped version of it.
|
||||
fn then<B>(self, service: B) -> Then<Self, B>
|
||||
where
|
||||
Self: Sized,
|
||||
B: Service<Request = Result<Self::Response, Self::Error>, Error = Self::Error> + Clone,
|
||||
{
|
||||
Then::new(self, service)
|
||||
}
|
||||
|
||||
/// Map this service's output to a different type, returning a new service of
|
||||
/// the resulting type.
|
||||
///
|
||||
/// This function is similar to the `Option::map` or `Iterator::map` where
|
||||
/// it will change the type of the underlying service.
|
||||
///
|
||||
/// Note that this function consumes the receiving service and returns a
|
||||
/// wrapped version of it, similar to the existing `map` methods in the
|
||||
/// standard library.
|
||||
fn map<F, R>(self, f: F) -> Map<Self, F, R>
|
||||
where
|
||||
Self: Sized,
|
||||
F: Fn(Self::Response) -> R + Clone,
|
||||
{
|
||||
Map::new(self, f)
|
||||
}
|
||||
|
||||
/// Map this service's error to a different error, returning a new service.
|
||||
///
|
||||
/// This function is similar to the `Result::map_err` where it will change
|
||||
/// the error type of the underlying service. This is useful for example to
|
||||
/// ensure that services have the same error type.
|
||||
///
|
||||
/// Note that this function consumes the receiving service and returns a
|
||||
/// wrapped version of it.
|
||||
fn map_err<F, E>(self, f: F) -> MapErr<Self, F, E>
|
||||
where
|
||||
Self: Sized,
|
||||
F: Fn(Self::Error) -> E + Clone,
|
||||
{
|
||||
MapErr::new(self, f)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,183 @@
|
|||
use futures::{Async, Future, Poll};
|
||||
use tower_service::Service;
|
||||
|
||||
/// Service for the `then` combinator, chaining a computation onto the end of
|
||||
/// another service.
|
||||
///
|
||||
/// This is created by the `ServiceExt::then` method.
|
||||
pub struct Then<A, B> {
|
||||
a: A,
|
||||
b: B,
|
||||
}
|
||||
|
||||
impl<A, B> Then<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = Result<A::Response, A::Error>, Error = A::Error> + Clone,
|
||||
{
|
||||
/// Create new `Then` combinator
|
||||
pub fn new(a: A, b: B) -> Then<A, B> {
|
||||
Then { a, b }
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Clone for Then<A, B>
|
||||
where
|
||||
A: Service + Clone,
|
||||
B: Service + Clone,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Then {
|
||||
a: self.a.clone(),
|
||||
b: self.b.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Service for Then<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = Result<A::Response, A::Error>, Error = A::Error> + Clone,
|
||||
{
|
||||
type Request = A::Request;
|
||||
type Response = B::Response;
|
||||
type Error = B::Error;
|
||||
type Future = ThenFuture<A, B>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
let _ = try_ready!(self.a.poll_ready());
|
||||
self.b.poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
ThenFuture::new(self.a.call(req), self.b.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ThenFuture<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = Result<A::Response, A::Error>>,
|
||||
{
|
||||
b: B,
|
||||
fut_b: Option<B::Future>,
|
||||
fut_a: A::Future,
|
||||
}
|
||||
|
||||
impl<A, B> ThenFuture<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = Result<A::Response, A::Error>>,
|
||||
{
|
||||
fn new(fut_a: A::Future, b: B) -> Self {
|
||||
ThenFuture {
|
||||
b,
|
||||
fut_a,
|
||||
fut_b: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Future for ThenFuture<A, B>
|
||||
where
|
||||
A: Service,
|
||||
B: Service<Request = Result<A::Response, A::Error>>,
|
||||
{
|
||||
type Item = B::Response;
|
||||
type Error = B::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Some(ref mut fut) = self.fut_b {
|
||||
return fut.poll();
|
||||
}
|
||||
|
||||
match self.fut_a.poll() {
|
||||
Ok(Async::Ready(resp)) => {
|
||||
self.fut_b = Some(self.b.call(Ok(resp)));
|
||||
self.poll()
|
||||
}
|
||||
Err(err) => {
|
||||
self.fut_b = Some(self.b.call(Err(err)));
|
||||
self.poll()
|
||||
}
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::future::{err, ok, FutureResult};
|
||||
use futures::{Async, Poll};
|
||||
use std::cell::Cell;
|
||||
use std::rc::Rc;
|
||||
|
||||
use super::*;
|
||||
use ServiceExt;
|
||||
|
||||
struct Srv1(Rc<Cell<usize>>);
|
||||
impl Service for Srv1 {
|
||||
type Request = Result<&'static str, &'static str>;
|
||||
type Response = &'static str;
|
||||
type Error = ();
|
||||
type Future = FutureResult<Self::Response, Self::Error>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.0.set(self.0.get() + 1);
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
match req {
|
||||
Ok(msg) => ok(msg),
|
||||
Err(_) => err(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Srv2(Rc<Cell<usize>>);
|
||||
|
||||
impl Service for Srv2 {
|
||||
type Request = Result<&'static str, ()>;
|
||||
type Response = (&'static str, &'static str);
|
||||
type Error = ();
|
||||
type Future = FutureResult<Self::Response, ()>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.0.set(self.0.get() + 1);
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
match req {
|
||||
Ok(msg) => ok((msg, "ok")),
|
||||
Err(()) => ok(("srv2", "err")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_poll_ready() {
|
||||
let cnt = Rc::new(Cell::new(0));
|
||||
let mut srv = Srv1(cnt.clone()).then(Srv2(cnt.clone()));
|
||||
let res = srv.poll_ready();
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), Async::Ready(()));
|
||||
assert_eq!(cnt.get(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_call() {
|
||||
let cnt = Rc::new(Cell::new(0));
|
||||
let mut srv = Srv1(cnt.clone()).then(Srv2(cnt));
|
||||
|
||||
let res = srv.call(Ok("srv1")).poll();
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok")));
|
||||
|
||||
let res = srv.call(Err("srv")).poll();
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), Async::Ready(("srv2", "err")));
|
||||
}
|
||||
}
|
|
@ -1,14 +1,17 @@
|
|||
//! Various utility types and functions that are generally with Tower.
|
||||
|
||||
#[macro_use]
|
||||
extern crate futures;
|
||||
extern crate tower_service;
|
||||
|
||||
pub mod either;
|
||||
pub mod option;
|
||||
pub mod boxed;
|
||||
pub mod either;
|
||||
pub mod ext;
|
||||
pub mod option;
|
||||
mod service_fn;
|
||||
|
||||
pub use boxed::BoxService;
|
||||
pub use either::EitherService;
|
||||
pub use service_fn::{NewServiceFn};
|
||||
pub use ext::ServiceExt;
|
||||
pub use option::OptionService;
|
||||
pub use service_fn::NewServiceFn;
|
||||
|
|
Loading…
Reference in New Issue