diff --git a/Cargo.toml b/Cargo.toml index 078fee7..c1e948e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,6 @@ members = [ "tower-service-util", "tower-timeout", ] + +[patch.'https://github.com/tower-rs/tower'] +tower-retry = { path = "tower-retry" } \ No newline at end of file diff --git a/tower-in-flight-limit/src/layer.rs b/tower-in-flight-limit/src/layer.rs index 4e5e945..4f13f13 100644 --- a/tower-in-flight-limit/src/layer.rs +++ b/tower-in-flight-limit/src/layer.rs @@ -1,6 +1,6 @@ use tower_layer::Layer; use tower_service::Service; -use {Error, InFlightLimit}; +use {Error, InFlightLimit, Never}; #[derive(Debug, Clone)] pub struct InFlightLimitLayer { @@ -20,7 +20,7 @@ where { type Response = S::Response; type Error = Error; - type LayerError = (); + type LayerError = Never; type Service = InFlightLimit; fn layer(&self, service: S) -> Result { diff --git a/tower-in-flight-limit/src/lib.rs b/tower-in-flight-limit/src/lib.rs index 8e3f701..1b4e898 100644 --- a/tower-in-flight-limit/src/lib.rs +++ b/tower-in-flight-limit/src/lib.rs @@ -9,9 +9,11 @@ extern crate tower_service; pub mod future; mod layer; +mod never; use future::ResponseFuture; pub use layer::InFlightLimitLayer; +use never::Never; use tower_service::Service; @@ -31,7 +33,7 @@ struct Limit { permit: semaphore::Permit, } -type Error = Box<::std::error::Error + Send + Sync>; +type Error = Box; // ===== impl InFlightLimit ===== diff --git a/tower-in-flight-limit/src/never.rs b/tower-in-flight-limit/src/never.rs new file mode 100644 index 0000000..3bb12aa --- /dev/null +++ b/tower-in-flight-limit/src/never.rs @@ -0,0 +1,12 @@ +use std::fmt; +#[derive(Debug)] +/// An error that can never occur. +pub enum Never {} + +impl fmt::Display for Never { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + match *self {} + } +} + +impl std::error::Error for Never {} diff --git a/tower-layer/Cargo.toml b/tower-layer/Cargo.toml index fed2d95..ebefd6e 100644 --- a/tower-layer/Cargo.toml +++ b/tower-layer/Cargo.toml @@ -20,6 +20,9 @@ categories = ["asynchronous", "network-programming"] futures = "0.1" tower-service = "0.2.0" +[dev-dependencies] +void = "1" + [features] default = ["util"] util = [] diff --git a/tower-layer/src/lib.rs b/tower-layer/src/lib.rs index 0f35bb3..75e55a9 100644 --- a/tower-layer/src/lib.rs +++ b/tower-layer/src/lib.rs @@ -34,10 +34,12 @@ use tower_service::Service; /// ```rust /// # extern crate futures; /// # extern crate tower_service; +/// # extern crate void; /// # use tower_service::Service; /// # use futures::{Poll, Async}; /// # use tower_layer::Layer; /// # use std::fmt; +/// # use void::Void; /// /// pub struct LogLayer { /// target: &'static str, @@ -50,7 +52,7 @@ use tower_service::Service; /// { /// type Response = S::Response; /// type Error = S::Error; -/// type LayerError = (); +/// type LayerError = Void; /// type Service = LogService; /// /// fn layer(&self, service: S) -> Result { diff --git a/tower-layer/src/util/chain.rs b/tower-layer/src/util/chain.rs index 7cbfe50..b49139e 100644 --- a/tower-layer/src/util/chain.rs +++ b/tower-layer/src/util/chain.rs @@ -10,13 +10,7 @@ pub struct Chain { outer: Outer, } -/// Error's produced when chaining two layers together -pub enum ChainError { - /// Error produced from the inner layer call - Inner(I), - /// Error produced from the outer layer call - Outer(O), -} +type Error = Box; impl Chain { /// Create a new `Chain`. @@ -29,19 +23,18 @@ impl Layer for Chain where S: Service, Inner: Layer, + Inner::LayerError: Into, Outer: Layer, + Outer::LayerError: Into, { type Response = Outer::Response; type Error = Outer::Error; - type LayerError = ChainError; + type LayerError = Error; type Service = Outer::Service; fn layer(&self, service: S) -> Result { - let inner = self - .inner - .layer(service) - .map_err(|e| ChainError::Inner(e))?; + let inner = self.inner.layer(service).map_err(Into::into)?; - self.outer.layer(inner).map_err(|e| ChainError::Outer(e)) + self.outer.layer(inner).map_err(Into::into) } } diff --git a/tower-layer/src/util/identity.rs b/tower-layer/src/util/identity.rs index 072dba0..ebf5bdb 100644 --- a/tower-layer/src/util/identity.rs +++ b/tower-layer/src/util/identity.rs @@ -1,4 +1,3 @@ -use std::error::Error; use std::fmt; use tower_service::Service; use Layer; diff --git a/tower-layer/src/util/mod.rs b/tower-layer/src/util/mod.rs index ceafb65..d8a1ac6 100644 --- a/tower-layer/src/util/mod.rs +++ b/tower-layer/src/util/mod.rs @@ -5,7 +5,7 @@ use Layer; mod chain; mod identity; -pub use self::chain::{Chain, ChainError}; +pub use self::chain::Chain; pub use self::identity::Identity; /// An extension trait for `Layer`'s that provides a variety of convenient @@ -17,7 +17,7 @@ pub trait LayerExt: Layer { /// This defines a middleware stack. fn chain(self, middleware: T) -> Chain where - T: Layer, + T: Layer, Self: Sized, { Chain::new(self, middleware) diff --git a/tower-retry/src/lib.rs b/tower-retry/src/lib.rs index f25b958..1e5b3a6 100644 --- a/tower-retry/src/lib.rs +++ b/tower-retry/src/lib.rs @@ -15,6 +15,9 @@ use tower_layer::Layer; use tower_service::Service; pub mod budget; +mod never; + +use never::Never; /// A "retry policy" to classify if a request should be retried. /// @@ -135,7 +138,7 @@ where { type Response = S::Response; type Error = S::Error; - type LayerError = (); + type LayerError = Never; type Service = Retry; fn layer(&self, service: S) -> Result { diff --git a/tower-retry/src/never.rs b/tower-retry/src/never.rs new file mode 100644 index 0000000..3bb12aa --- /dev/null +++ b/tower-retry/src/never.rs @@ -0,0 +1,12 @@ +use std::fmt; +#[derive(Debug)] +/// An error that can never occur. +pub enum Never {} + +impl fmt::Display for Never { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + match *self {} + } +} + +impl std::error::Error for Never {} diff --git a/tower-timeout/src/layer.rs b/tower-timeout/src/layer.rs index 4ac3c1d..66b796f 100644 --- a/tower-timeout/src/layer.rs +++ b/tower-timeout/src/layer.rs @@ -1,8 +1,8 @@ use crate::{Error, Timeout}; +use never::Never; use std::time::Duration; use tower_layer::Layer; use tower_service::Service; - /// Applies a timeout to requests via the supplied inner service. #[derive(Debug)] pub struct TimeoutLayer { @@ -23,7 +23,7 @@ where { type Response = S::Response; type Error = Error; - type LayerError = (); + type LayerError = Never; type Service = Timeout; fn layer(&self, service: S) -> Result { diff --git a/tower-timeout/src/lib.rs b/tower-timeout/src/lib.rs index 7ea883c..1c4399c 100644 --- a/tower-timeout/src/lib.rs +++ b/tower-timeout/src/lib.rs @@ -15,6 +15,7 @@ extern crate tower_service; pub mod error; pub mod future; mod layer; +mod never; pub use crate::layer::TimeoutLayer; diff --git a/tower-timeout/src/never.rs b/tower-timeout/src/never.rs new file mode 100644 index 0000000..3bb12aa --- /dev/null +++ b/tower-timeout/src/never.rs @@ -0,0 +1,12 @@ +use std::fmt; +#[derive(Debug)] +/// An error that can never occur. +pub enum Never {} + +impl fmt::Display for Never { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + match *self {} + } +} + +impl std::error::Error for Never {} diff --git a/tower/Cargo.toml b/tower/Cargo.toml index 07c9f3d..bf3e36c 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -16,13 +16,25 @@ keywords = ["io", "async", "non-blocking", "futures", "service"] [dependencies] futures = "0.1" tower-service = "0.2.0" +tower-layer = { version = "0.1", path = "../tower-layer" } tower-service-util = { version = "0.1.0", path = "../tower-service-util", features = ["io", "either"] } [dev-dependencies] -tower-service = "0.2.0" futures = "0.1" +tower-service = "0.2" +tower-in-flight-limit = { version = "0.1", path = "../tower-in-flight-limit" } +tower-rate-limit = { version = "0.1", path = "../tower-rate-limit" } +tower-reconnect = { version = "0.1", path = "../tower-reconnect" } +tower-retry = { version = "0.1", path = "../tower-retry" } +tower-buffer = { version = "0.1", path = "../tower-buffer" } +tower-hyper = { git = "https://github.com/tower-rs/tower-hyper" } +tower-layer = { version = "0.1", path = "../tower-layer" } +tokio-tcp = "0.1" +hyper = "0.12" log = "0.4.1" +tokio = "0.1" env_logger = { version = "0.5.3", default-features = false } tokio-timer = "0.1" futures-cpupool = "0.1" tokio-mock-task = "0.1" +void = "1" diff --git a/tower/examples/client.rs b/tower/examples/client.rs new file mode 100644 index 0000000..c9f630e --- /dev/null +++ b/tower/examples/client.rs @@ -0,0 +1,74 @@ +extern crate futures; +extern crate hyper; +extern crate tower; +extern crate tower_buffer; +extern crate tower_hyper; +extern crate tower_in_flight_limit; +extern crate tower_rate_limit; +extern crate tower_reconnect; +extern crate tower_retry; +extern crate tower_service; + +use futures::Future; +use hyper::client::connect::Destination; +use hyper::client::HttpConnector; +use hyper::{Request, Response, Uri}; +use std::time::Duration; +use tower::builder::ServiceBuilder; +use tower::ServiceExt; +use tower_buffer::BufferLayer; +use tower_hyper::client::{Builder, Connect}; +use tower_hyper::retry::{Body, RetryPolicy}; +use tower_hyper::util::Connector; +use tower_in_flight_limit::InFlightLimitLayer; +use tower_rate_limit::RateLimitLayer; +use tower_reconnect::Reconnect; +use tower_retry::RetryLayer; +use tower_service::Service; + +fn main() { + let fut = futures::lazy(|| { + request().map(|resp| { + dbg!(resp); + }) + }); + hyper::rt::run(fut) +} + +fn request() -> impl Future, Error = ()> { + let connector = Connector::new(HttpConnector::new(1)); + let hyper = Connect::new(connector, Builder::new()); + + // RetryPolicy is a very simple policy that retries `n` times + // if the response has a 500 status code. Here, `n` is 5. + let policy = RetryPolicy::new(5); + // We're calling the tower/examples/server.rs. + let dst = Destination::try_from_uri(Uri::from_static("http://127.0.0.1:3000")).unwrap(); + + // Now, to build the service! We use two BufferLayers in order to: + // - provide backpressure for the RateLimitLayer, RateLimitLayer, and InFlightLimitLayer + // - meet `RetryLayer`'s requirement that our service implement `Service + Clone` + // - ..and to provide cheap clones on the service. + let maker = ServiceBuilder::new() + .layer(BufferLayer::new(5)) + .layer(RateLimitLayer::new(5, Duration::from_secs(1))) + .layer(InFlightLimitLayer::new(5)) + .layer(RetryLayer::new(policy)) + .layer(BufferLayer::new(5)) + .build_make_service(hyper); + + // `Reconnect` accepts a destination and a MakeService, creating a new service + // any time the connection encounters an error. + let client = Reconnect::new(maker, dst); + + let request = Request::builder() + .method("GET") + .body(Body::from(Vec::new())) + .unwrap(); + + // we check to see if the client is ready to accept requests. + client + .ready() + .map_err(|e| panic!("Service is not ready: {:?}", e)) + .and_then(|mut c| c.call(request).map_err(|e| panic!("{:?}", e))) +} diff --git a/tower/examples/server.rs b/tower/examples/server.rs new file mode 100644 index 0000000..5a9292c --- /dev/null +++ b/tower/examples/server.rs @@ -0,0 +1,79 @@ +extern crate futures; +extern crate hyper; +extern crate tokio_tcp; +extern crate tower; +extern crate tower_hyper; +extern crate tower_in_flight_limit; +extern crate tower_service; + +use futures::{future, Future, Poll, Stream}; +use hyper::{Body, Request, Response}; +use tokio_tcp::TcpListener; +use tower::builder::ServiceBuilder; +use tower_hyper::body::LiftBody; +use tower_hyper::server::Server; +use tower_in_flight_limit::InFlightLimitLayer; +use tower_service::Service; + +fn main() { + hyper::rt::run(future::lazy(|| { + let addr = "127.0.0.1:3000".parse().unwrap(); + let bind = TcpListener::bind(&addr).expect("bind"); + + println!("Listening on http://{}", addr); + + let maker = ServiceBuilder::new() + .layer(InFlightLimitLayer::new(5)) + .build_make_service(MakeSvc); + + let server = Server::new(maker); + + bind.incoming() + .fold(server, |mut server, stream| { + if let Err(e) = stream.set_nodelay(true) { + return Err(e); + } + + hyper::rt::spawn( + server + .serve(stream) + .map_err(|e| panic!("Server error {:?}", e)), + ); + + Ok(server) + }) + .map_err(|e| panic!("serve error: {:?}", e)) + .map(|_| {}) + })); +} + +struct Svc; +impl Service> for Svc { + type Response = Response>; + type Error = hyper::Error; + type Future = future::FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(().into()) + } + + fn call(&mut self, _req: Request) -> Self::Future { + let res = Response::new(LiftBody::new(Body::from("Hello World!"))); + future::ok(res) + } +} + +struct MakeSvc; +impl Service<()> for MakeSvc { + type Response = Svc; + type Error = hyper::Error; + type Future = Box + Send + 'static>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(().into()) + } + + fn call(&mut self, _: ()) -> Self::Future { + Box::new(future::ok(Svc)) + } +} diff --git a/tower/src/builder/mod.rs b/tower/src/builder/mod.rs new file mode 100644 index 0000000..78d49a8 --- /dev/null +++ b/tower/src/builder/mod.rs @@ -0,0 +1,193 @@ +//! Builder types to compose layers and services + +mod service; + +pub use self::service::{LayeredMakeService, ServiceFuture}; + +use tower_layer::{ + util::{Chain, Identity}, + Layer, +}; +use tower_service::Service; +use tower_service_util::MakeService; + +pub(super) type Error = Box<::std::error::Error + Send + Sync>; + +/// `ServiceBuilder` provides a [builder-like interface](https://doc.rust-lang.org/1.0.0/style/ownership/builders.html) for composing Layers and a connection, where the latter is modeled by +/// a `MakeService`. The builder produces either a new `Service` or `MakeService`, +/// depending on whether `build_service` or `build_maker` is called. +/// +/// # Services and MakeServices +/// +/// - A [`Service`](tower_service::Service) is a trait representing an asynchronous +/// function of a request to a response. It is similar to +/// `async fn(Request) -> Result`. +/// - A [`MakeService`](tower_service_util::MakeService) is a trait creating specific +/// instances of a `Service` +/// +/// # Service +/// +/// A `Service` is typically bound to a single transport, such as a TCP connection. +/// It defines how _all_ inbound or outbound requests are handled by that connection. +/// +/// # MakeService +/// +/// Since a `Service` is bound to a single connection, a `MakeService` allows for the +/// creation of _new_ `Service`s that'll be bound to _different_ different connections. +/// This is useful for servers, as they require the ability to accept new connections. +/// +/// Resources that need to be shared by all `Service`s can be put into a +/// `MakeService`, and then passed to individual `Service`s when `build_maker` +/// is called. +/// +/// # Examples +/// +/// A MakeService stack with a single layer: +/// +/// ```rust +/// # extern crate tower; +/// # extern crate tower_in_flight_limit; +/// # extern crate futures; +/// # extern crate void; +/// # use void::Void; +/// # use tower::Service; +/// # use tower::builder::ServiceBuilder; +/// # use tower_in_flight_limit::InFlightLimitLayer; +/// # use futures::{Poll, future::{self, FutureResult}}; +/// # #[derive(Debug)] +/// # struct MyMakeService; +/// # impl Service<()> for MyMakeService { +/// # type Response = MyService; +/// # type Error = Void; +/// # type Future = FutureResult; +/// # fn poll_ready(&mut self) -> Poll<(), Self::Error> { +/// # Ok(().into()) +/// # } +/// # fn call(&mut self, _: ()) -> Self::Future { +/// # future::ok(MyService) +/// # } +/// # } +/// # #[derive(Debug)] +/// # struct MyService; +/// # impl Service<()> for MyService { +/// # type Response = (); +/// # type Error = Void; +/// # type Future = FutureResult; +/// # fn poll_ready(&mut self) -> Poll<(), Self::Error> { +/// # Ok(().into()) +/// # } +/// # fn call(&mut self, _: ()) -> Self::Future { +/// # future::ok(()) +/// # } +/// # } +/// ServiceBuilder::new() +/// .layer(InFlightLimitLayer::new(5)) +/// .build_make_service(MyMakeService); +/// ``` +/// +/// A `Service` stack with a single layer: +/// +/// ``` +/// # extern crate tower; +/// # extern crate tower_in_flight_limit; +/// # extern crate futures; +/// # extern crate void; +/// # use void::Void; +/// # use tower::Service; +/// # use tower::builder::ServiceBuilder; +/// # use tower_in_flight_limit::InFlightLimitLayer; +/// # use futures::{Poll, future::{self, FutureResult}}; +/// # #[derive(Debug)] +/// # struct MyService; +/// # impl Service<()> for MyService { +/// # type Response = (); +/// # type Error = Void; +/// # type Future = FutureResult; +/// # fn poll_ready(&mut self) -> Poll<(), Self::Error> { +/// # Ok(().into()) +/// # } +/// # fn call(&mut self, _: ()) -> Self::Future { +/// # future::ok(()) +/// # } +/// # } +/// ServiceBuilder::new() +/// .layer(InFlightLimitLayer::new(5)) +/// .build_service(MyService); +/// ``` +/// +/// A `Service` stack with _multiple_ layers that contain rate limiting, in-flight request limits, +/// and a channel-backed, clonable `Service`: +/// +/// ``` +/// # extern crate tower; +/// # extern crate tower_in_flight_limit; +/// # extern crate tower_buffer; +/// # extern crate tower_rate_limit; +/// # extern crate futures; +/// # extern crate void; +/// # use void::Void; +/// # use tower::Service; +/// # use tower::builder::ServiceBuilder; +/// # use tower_in_flight_limit::InFlightLimitLayer; +/// # use tower_buffer::BufferLayer; +/// # use tower_rate_limit::RateLimitLayer; +/// # use std::time::Duration; +/// # use futures::{Poll, future::{self, FutureResult}}; +/// # #[derive(Debug)] +/// # struct MyService; +/// # impl Service<()> for MyService { +/// # type Response = (); +/// # type Error = Void; +/// # type Future = FutureResult; +/// # fn poll_ready(&mut self) -> Poll<(), Self::Error> { +/// # Ok(().into()) +/// # } +/// # fn call(&mut self, _: ()) -> Self::Future { +/// # future::ok(()) +/// # } +/// # } +/// ServiceBuilder::new() +/// .layer(BufferLayer::new(5)) +/// .layer(InFlightLimitLayer::new(5)) +/// .layer(RateLimitLayer::new(5, Duration::from_secs(1))) +/// .build_service(MyService); +/// ``` +#[derive(Debug)] +pub struct ServiceBuilder { + layer: L, +} + +impl ServiceBuilder { + /// Create a new `ServiceBuilder` from a `MakeService`. + pub fn new() -> Self { + ServiceBuilder { + layer: Identity::new(), + } + } +} + +impl ServiceBuilder { + /// Layer a new layer `T` onto the `ServiceBuilder`. + pub fn layer(self, layer: T) -> ServiceBuilder> { + ServiceBuilder { + layer: Chain::new(layer, self.layer), + } + } + + /// Create a `LayeredMakeService` from the composed layers and transport `MakeService`. + pub fn build_make_service(self, mk: M) -> LayeredMakeService + where + M: MakeService, + { + LayeredMakeService::new(mk, self.layer) + } + + /// Wrap the service `S` with the layers. + pub fn build_service(self, service: S) -> Result + where + L: Layer, + S: Service, + { + self.layer.layer(service) + } +} diff --git a/tower/src/builder/service.rs b/tower/src/builder/service.rs new file mode 100644 index 0000000..7578e18 --- /dev/null +++ b/tower/src/builder/service.rs @@ -0,0 +1,79 @@ +use super::Error; +use futures::{Async, Future, Poll}; +use std::marker::PhantomData; +use std::sync::Arc; +use tower_layer::Layer; +use tower_service_util::MakeService; +use Service; + +/// Composed `MakeService` produced from `ServiceBuilder` +#[derive(Debug)] +pub struct LayeredMakeService { + maker: S, + layer: Arc, + _pd: PhantomData, +} + +/// Async resolve the MakeService and wrap it with the layers +#[derive(Debug)] +pub struct ServiceFuture +where + S: MakeService, +{ + inner: S::Future, + layer: Arc, +} + +impl LayeredMakeService { + pub(crate) fn new(maker: S, layer: L) -> Self { + LayeredMakeService { + maker, + layer: Arc::new(layer), + _pd: PhantomData, + } + } +} + +impl Service for LayeredMakeService +where + S: MakeService, + S::MakeError: Into, + L: Layer + Sync + Send + 'static, + L::LayerError: Into, + Target: Clone, +{ + type Response = L::Service; + type Error = Error; + type Future = ServiceFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.maker.poll_ready().map_err(Into::into) + } + + fn call(&mut self, target: Target) -> Self::Future { + let inner = self.maker.make_service(target); + let layer = Arc::clone(&self.layer); + + ServiceFuture { inner, layer } + } +} + +impl Future for ServiceFuture +where + S: MakeService, + S::MakeError: Into, + L: Layer, + L::LayerError: Into, +{ + type Item = L::Service; + type Error = Error; + + fn poll(&mut self) -> Poll { + let service = try_ready!(self.inner.poll().map_err(Into::into)); + + match self.layer.layer(service) { + Ok(service) => Ok(Async::Ready(service)), + Err(e) => Err(e.into()), + } + } +} diff --git a/tower/src/lib.rs b/tower/src/lib.rs index 1e808ab..f9b7e8b 100644 --- a/tower/src/lib.rs +++ b/tower/src/lib.rs @@ -1,14 +1,17 @@ //! Various utility types and functions that are generally with Tower. +pub mod builder; #[macro_use] extern crate futures; #[cfg(test)] extern crate tokio_mock_task; +extern crate tower_layer; extern crate tower_service; extern crate tower_service_util; pub mod util; +pub use builder::ServiceBuilder; pub use tower_service::Service; pub use tower_service_util::MakeConnection; pub use tower_service_util::MakeService; diff --git a/tower/tests/builder.rs b/tower/tests/builder.rs new file mode 100644 index 0000000..ba7baeb --- /dev/null +++ b/tower/tests/builder.rs @@ -0,0 +1,136 @@ +extern crate futures; +extern crate tokio; +extern crate tower; +extern crate tower_buffer; +extern crate tower_in_flight_limit; +extern crate tower_rate_limit; +extern crate tower_reconnect; +extern crate tower_retry; +extern crate tower_service; +extern crate void; + +use futures::future::{self, FutureResult}; +use futures::prelude::*; +use std::time::Duration; +use tower::builder::ServiceBuilder; +use tower_buffer::BufferLayer; +use tower_in_flight_limit::InFlightLimitLayer; +use tower_rate_limit::RateLimitLayer; +use tower_reconnect::Reconnect; +use tower_retry::{Policy, RetryLayer}; +use tower_service::*; +use void::Void; + +#[test] +fn builder_make_service() { + tokio::run(future::lazy(|| { + let maker = ServiceBuilder::new() + .layer(BufferLayer::new(5)) + .layer(InFlightLimitLayer::new(5)) + .layer(RateLimitLayer::new(5, Duration::from_secs(1))) + .build_make_service(MockMaker); + + let mut client = Reconnect::new(maker, ()); + + client.poll_ready().unwrap(); + client + .call(Request) + .map(|_| ()) + .map_err(|_| panic!("this is bad")) + })); +} + +#[test] +fn builder_service() { + tokio::run(future::lazy(|| { + let mut client = ServiceBuilder::new() + .layer(BufferLayer::new(5)) + .layer(InFlightLimitLayer::new(5)) + .layer(RateLimitLayer::new(5, Duration::from_secs(1))) + .build_service(MockSvc) + .unwrap(); + + client.poll_ready().unwrap(); + client + .call(Request) + .map(|_| ()) + .map_err(|_| panic!("this is bad")) + })); +} + +#[test] +fn builder_make_service_retry() { + tokio::run(future::lazy(|| { + let policy = MockPolicy; + + let maker = ServiceBuilder::new() + .layer(BufferLayer::new(5)) + .layer(RateLimitLayer::new(5, Duration::from_secs(1))) + .layer(InFlightLimitLayer::new(5)) + .layer(RetryLayer::new(policy)) + .layer(BufferLayer::new(5)) + .build_make_service(MockMaker); + + let mut client = Reconnect::new(maker, ()); + + client.poll_ready().unwrap(); + client + .call(Request) + .map(|_| ()) + .map_err(|_| panic!("this is bad")) + })); +} + +#[derive(Debug)] +struct MockMaker; +impl Service<()> for MockMaker { + type Response = MockSvc; + type Error = Void; + type Future = FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(().into()) + } + + fn call(&mut self, _: ()) -> Self::Future { + future::ok(MockSvc) + } +} + +#[derive(Debug, Clone)] +struct Request; +#[derive(Debug, Clone)] +struct Response; +#[derive(Debug)] +struct MockSvc; +impl Service for MockSvc { + type Response = Response; + type Error = Void; + type Future = FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(().into()) + } + + fn call(&mut self, _: Request) -> Self::Future { + future::ok(Response) + } +} + +#[derive(Debug, Clone)] +struct MockPolicy; + +impl Policy for MockPolicy +where + E: Into>, +{ + type Future = FutureResult; + + fn retry(&self, _req: &Request, _result: Result<&Response, &E>) -> Option { + None + } + + fn clone_request(&self, req: &Request) -> Option { + Some(req.clone()) + } +}