diff --git a/Cargo.toml b/Cargo.toml index 13e0cbe..7fe684c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ members = [ "tower-filter", "tower-in-flight-limit", "tower-mock", + "tower-layer", "tower-rate-limit", "tower-reconnect", "tower-retry", diff --git a/README.md b/README.md index 21d2ffb..43d8809 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,9 @@ crates. * [`tower-service`]: The foundational traits upon which Tower is built ([docs][ts-docs]). +* [`tower-layer`]: The foundational trait to compose services together + ([docs][tl-docs]). + * [`tower-balance`]: A load balancer. Load is balanced across a number of services ([docs][tb-docs]). @@ -73,6 +76,8 @@ terms or conditions. [`tower-service`]: tower-service [ts-docs]: https://docs.rs/tower-service/ +[`tower-layer`]: tower-layer +[tl-docs]: https://docs.rs/tower-layer/ [`tower-balance`]: tower-balance [tb-docs]: https://tower-rs.github.io/tower/tower_balance/index.html [`tower-buffer`]: tower-buffer diff --git a/tower-buffer/Cargo.toml b/tower-buffer/Cargo.toml index bc629ff..c295ea7 100644 --- a/tower-buffer/Cargo.toml +++ b/tower-buffer/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] futures = "0.1" tower-service = { version = "0.2", path = "../tower-service" } +tower-layer = { version = "0.1", path = "../tower-layer" } tokio-executor = "0.1" tokio-sync = "0.1" diff --git a/tower-buffer/src/lib.rs b/tower-buffer/src/lib.rs index e23fe70..c27fa58 100644 --- a/tower-buffer/src/lib.rs +++ b/tower-buffer/src/lib.rs @@ -9,6 +9,7 @@ extern crate futures; extern crate tokio_executor; extern crate tokio_sync; +extern crate tower_layer; extern crate tower_service; pub mod error; @@ -27,6 +28,7 @@ use futures::Poll; use tokio_executor::DefaultExecutor; use tokio_sync::mpsc; use tokio_sync::oneshot; +use tower_layer::Layer; use tower_service::Service; /// Adds a buffer in front of an inner service. @@ -40,6 +42,48 @@ where worker: worker::Handle, } +/// Buffer requests with a bounded buffer +pub struct BufferLayer { + bound: usize, + executor: E, +} + +impl BufferLayer { + pub fn new(bound: usize) -> Self { + BufferLayer { + bound, + executor: DefaultExecutor::current(), + } + } +} + +impl BufferLayer { + pub fn with_executor(bound: usize, executor: E) -> Self + where + S: Service, + S::Error: Into, + E: WorkerExecutor, + { + BufferLayer { bound, executor } + } +} + +impl Layer for BufferLayer +where + S: Service, + S::Error: Into, + E: WorkerExecutor, +{ + type Response = S::Response; + type Error = Error; + type LayerError = SpawnError; + type Service = Buffer; + + fn layer(&self, service: S) -> Result { + Buffer::with_executor(service, self.bound, &self.executor) + } +} + impl Buffer where T: Service, diff --git a/tower-filter/Cargo.toml b/tower-filter/Cargo.toml index b868ba7..724bda5 100644 --- a/tower-filter/Cargo.toml +++ b/tower-filter/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] futures = "0.1" tower-service = { version = "0.2", path = "../tower-service" } +tower-layer = { version = "0.1", path = "../tower-layer" } [dev-dependencies] tower-mock = { version = "0.1", path = "../tower-mock" } diff --git a/tower-filter/src/lib.rs b/tower-filter/src/lib.rs index 76b51bd..2e56031 100644 --- a/tower-filter/src/lib.rs +++ b/tower-filter/src/lib.rs @@ -2,10 +2,12 @@ //! a predicate. extern crate futures; +extern crate tower_layer; extern crate tower_service; use futures::task::AtomicTask; use futures::{Async, Future, IntoFuture, Poll}; +use tower_layer::Layer; use tower_service::Service; use std::sync::atomic::AtomicUsize; @@ -21,6 +23,11 @@ pub struct Filter { counts: Arc, } +pub struct FilterLayer { + predicate: U, + buffer: usize, +} + pub struct ResponseFuture where S: Service, @@ -76,6 +83,30 @@ enum State { // ===== impl Filter ===== +impl FilterLayer { + pub fn new(predicate: U, buffer: usize) -> Self { + FilterLayer { predicate, buffer } + } +} + +impl Layer for FilterLayer +where + U: Predicate + Clone, + S: Service + Clone, +{ + type Response = S::Response; + type Error = Error; + type LayerError = (); + type Service = Filter; + + fn layer(&self, service: S) -> Result { + let predicate = self.predicate.clone(); + Ok(Filter::new(service, predicate, self.buffer)) + } +} + +// ===== impl Filter ===== + impl Filter { pub fn new(inner: T, predicate: U, buffer: usize) -> Self where diff --git a/tower-in-flight-limit/Cargo.toml b/tower-in-flight-limit/Cargo.toml index 4298d38..07620f7 100644 --- a/tower-in-flight-limit/Cargo.toml +++ b/tower-in-flight-limit/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] futures = "0.1" tower-service = { version = "0.2", path = "../tower-service" } +tower-layer = { version = "0.1", path = "../tower-layer" } [dev-dependencies] tokio-test = { git = "https://github.com/carllerche/tokio-test" } diff --git a/tower-in-flight-limit/src/lib.rs b/tower-in-flight-limit/src/lib.rs index dadebe8..0e5c250 100644 --- a/tower-in-flight-limit/src/lib.rs +++ b/tower-in-flight-limit/src/lib.rs @@ -2,8 +2,10 @@ //! service. extern crate futures; +extern crate tower_layer; extern crate tower_service; +use tower_layer::Layer; use tower_service::Service; use futures::task::AtomicTask; @@ -19,6 +21,11 @@ pub struct InFlightLimit { state: State, } +#[derive(Debug, Clone)] +pub struct InFlightLimitLayer { + max: usize, +} + /// Error returned when the service has reached its limit. #[derive(Debug)] pub enum Error { @@ -124,6 +131,28 @@ where } } +// ===== impl InFlightLimitLayer ===== + +impl InFlightLimitLayer { + pub fn new(max: usize) -> Self { + InFlightLimitLayer { max } + } +} + +impl Layer for InFlightLimitLayer +where + S: Service, +{ + type Response = S::Response; + type Error = Error; + type LayerError = (); + type Service = InFlightLimit; + + fn layer(&self, service: S) -> Result { + Ok(InFlightLimit::new(service, self.max)) + } +} + // ===== impl ResponseFuture ===== impl Future for ResponseFuture diff --git a/tower-layer/Cargo.toml b/tower-layer/Cargo.toml new file mode 100644 index 0000000..f323344 --- /dev/null +++ b/tower-layer/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "tower-layer" +# When releasing to crates.io: +# - Update html_root_url. +# - Update documentation URL +# - Create "v0.x.y" git tag. +version = "0.1.0" +authors = ["Carl Lerche "] +license = "MIT" +readme = "README.md" +repository = "https://github.com/tower-rs/tower" +homepage = "https://github.com/tower-rs/tower" +documentation = "https://docs.rs/tokio-layer/0.1.0" +description = """ +Decorates a `Service` to allow easy composition between `Service`s. +""" +categories = ["asynchronous", "network-programming"] + +[dependencies] +futures = "0.1" +tower-service = { version = "0.2", path = "../tower-service" } + +[features] +default = ["ext"] +ext = [] \ No newline at end of file diff --git a/tower-layer/LICENSE b/tower-layer/LICENSE new file mode 100644 index 0000000..b980cac --- /dev/null +++ b/tower-layer/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2019 Tower Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/tower-layer/README.md b/tower-layer/README.md new file mode 100644 index 0000000..b7def25 --- /dev/null +++ b/tower-layer/README.md @@ -0,0 +1,21 @@ +# Tower Layer + +Decorates a `Service`, transforming either the request or the response. + +## Overview + +Often, many of the pieces needed for writing network applications can be +reused across multiple services. The `Layer` trait can be used to write +reusable components that can be applied to very different kinds of services; +for example, it can be applied to services operating on different protocols, +and to both the client and server side of a network transaction. + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tower by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/tower-layer/src/lib.rs b/tower-layer/src/lib.rs new file mode 100644 index 0000000..0f35bb3 --- /dev/null +++ b/tower-layer/src/lib.rs @@ -0,0 +1,110 @@ +//! Layer traits and extensions. +//! +//! A layer decorates an service and provides additional functionality. It +//! allows other services to be composed with the service that implements layer. +//! +//! A middleware implements the [`Layer`] and [`Service`] trait. + +#![deny(missing_docs)] +#![doc(html_root_url = "https://docs.rs/tower-layer/0.1.0")] + +extern crate futures; +extern crate tower_service; + +#[cfg(feature = "util")] +pub mod util; + +#[cfg(feature = "util")] +pub use util::LayerExt; + +use tower_service::Service; + +/// Decorates a `Service`, transforming either the request or the response. +/// +/// Often, many of the pieces needed for writing network applications can be +/// reused across multiple services. The `Layer` trait can be used to write +/// reusable components that can be applied to very different kinds of services; +/// for example, it can be applied to services operating on different protocols, +/// and to both the client and server side of a network transaction. +/// +/// # Log +/// +/// Take request logging as an example: +/// +/// ```rust +/// # extern crate futures; +/// # extern crate tower_service; +/// # use tower_service::Service; +/// # use futures::{Poll, Async}; +/// # use tower_layer::Layer; +/// # use std::fmt; +/// +/// pub struct LogLayer { +/// target: &'static str, +/// } +/// +/// impl Layer for LogLayer +/// where +/// S: Service, +/// Request: fmt::Debug, +/// { +/// type Response = S::Response; +/// type Error = S::Error; +/// type LayerError = (); +/// type Service = LogService; +/// +/// fn layer(&self, service: S) -> Result { +/// Ok(LogService { +/// target: self.target, +/// service +/// }) +/// } +/// } +/// +/// // This service implements the Log behavior +/// pub struct LogService { +/// target: &'static str, +/// service: S, +/// } +/// +/// impl Service for LogService +/// where +/// S: Service, +/// Request: fmt::Debug, +/// { +/// type Response = S::Response; +/// type Error = S::Error; +/// type Future = S::Future; +/// +/// fn poll_ready(&mut self) -> Poll<(), Self::Error> { +/// self.service.poll_ready() +/// } +/// +/// fn call(&mut self, request: Request) -> Self::Future { +/// // Insert log statement here or other functionality +/// println!("request = {:?}, target = {:?}", request, self.target); +/// self.service.call(request) +/// } +/// } +/// ``` +/// +/// The above log implementation is decoupled from the underlying protocol and +/// is also decoupled from client or server concerns. In other words, the same +/// log middleware could be used in either a client or a server. +pub trait Layer { + /// The wrapped service response type + type Response; + + /// The wrapped service's error type + type Error; + + /// The error produced when calling `layer` + type LayerError; + + /// The wrapped service + type Service: Service; + + /// Wrap the given service with the middleware, returning a new service + /// that has been decorated with the middleware. + fn layer(&self, inner: S) -> Result; +} diff --git a/tower-layer/src/util/chain.rs b/tower-layer/src/util/chain.rs new file mode 100644 index 0000000..8026952 --- /dev/null +++ b/tower-layer/src/util/chain.rs @@ -0,0 +1,33 @@ +use tower_service::Service; +use Layer; + +/// Two middlewares chained together. +/// +/// This type is produced by `Layer::chain`. +#[derive(Debug)] +pub struct Chain { + inner: Inner, + outer: Outer, +} + +impl Chain { + /// Create a new `Chain`. + pub fn new(inner: Inner, outer: Outer) -> Self { + Chain { inner, outer } + } +} + +impl Layer for Chain +where + S: Service, + Inner: Layer, + Outer: Layer, +{ + type Response = Outer::Response; + type Error = Outer::Error; + type Service = Outer::Service; + + fn layer(&self, service: S) -> Self::Service { + self.outer.wrap(self.inner.wrap(service)) + } +} diff --git a/tower-layer/src/util/mod.rs b/tower-layer/src/util/mod.rs new file mode 100644 index 0000000..7d7548c --- /dev/null +++ b/tower-layer/src/util/mod.rs @@ -0,0 +1,23 @@ +use Layer; + +mod chain; + +pub use self::chain::Chain; + +/// An extension trait for `Layer`'s that provides a variety of convenient +/// adapters. +pub trait LayerExt: Layer { + /// Return a new `Layer` instance that applies both `self` and + /// `middleware` to services being wrapped. + /// + /// This defines a middleware stack. + fn chain(self, middleware: T) -> Chain + where + T: Layer, + Self: Sized, + { + Chain::new(self, middleware) + } +} + +impl LayerExt for T where T: Layer {} diff --git a/tower-rate-limit/Cargo.toml b/tower-rate-limit/Cargo.toml index 759360f..3e4d7ea 100644 --- a/tower-rate-limit/Cargo.toml +++ b/tower-rate-limit/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] futures = "0.1" tower-service = { version = "0.2", path = "../tower-service" } +tower-layer = { version = "0.1", path = "../tower-layer"} tokio-timer = "0.2.6" [dev-dependencies] diff --git a/tower-rate-limit/src/lib.rs b/tower-rate-limit/src/lib.rs index f9136f4..a3f8a01 100644 --- a/tower-rate-limit/src/lib.rs +++ b/tower-rate-limit/src/lib.rs @@ -4,10 +4,12 @@ #[macro_use] extern crate futures; extern crate tokio_timer; +extern crate tower_layer; extern crate tower_service; use futures::{Future, Poll}; use tokio_timer::Delay; +use tower_layer::Layer; use tower_service::Service; use std::time::{Duration, Instant}; @@ -20,6 +22,11 @@ pub struct RateLimit { state: State, } +#[derive(Debug)] +pub struct RateLimitLayer { + rate: Rate, +} + #[derive(Debug, Copy, Clone)] pub struct Rate { num: u64, @@ -46,6 +53,27 @@ enum State { Ready { until: Instant, rem: u64 }, } +impl RateLimitLayer { + pub fn new(num: u64, per: Duration) -> Self { + let rate = Rate { num, per }; + RateLimitLayer { rate } + } +} + +impl Layer for RateLimitLayer +where + S: Service, +{ + type Response = S::Response; + type Error = Error; + type LayerError = (); + type Service = RateLimit; + + fn layer(&self, service: S) -> Result { + Ok(RateLimit::new(service, self.rate)) + } +} + impl RateLimit { /// Create a new rate limiter pub fn new(inner: T, rate: Rate) -> Self diff --git a/tower-retry/Cargo.toml b/tower-retry/Cargo.toml index 745b5c9..4bc87ad 100644 --- a/tower-retry/Cargo.toml +++ b/tower-retry/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] futures = "0.1" tower-service = { version = "0.2", path = "../tower-service" } +tower-layer = { version = "0.1", path = "../tower-layer" } tokio-timer = "0.2.4" [dev-dependencies] diff --git a/tower-retry/src/lib.rs b/tower-retry/src/lib.rs index 0b73cc4..f25b958 100644 --- a/tower-retry/src/lib.rs +++ b/tower-retry/src/lib.rs @@ -7,9 +7,11 @@ #[macro_use] extern crate futures; extern crate tokio_timer; +extern crate tower_layer; extern crate tower_service; use futures::{Async, Future, Poll}; +use tower_layer::Layer; use tower_service::Service; pub mod budget; @@ -89,6 +91,12 @@ pub struct Retry { service: S, } +/// Retry requests based on a policy +#[derive(Debug)] +pub struct RetryLayer

{ + policy: P, +} + /// The `Future` returned by a `Retry` service. #[derive(Debug)] pub struct ResponseFuture @@ -111,6 +119,31 @@ enum State { Retrying, } +// ===== impl RetryLayer ===== + +impl

RetryLayer

{ + /// Create a new `RetryLayer` from a retry policy + pub fn new(policy: P) -> Self { + RetryLayer { policy } + } +} + +impl Layer for RetryLayer

+where + S: Service + Clone, + P: Policy + Clone, +{ + type Response = S::Response; + type Error = S::Error; + type LayerError = (); + type Service = Retry; + + fn layer(&self, service: S) -> Result { + let policy = self.policy.clone(); + Ok(Retry::new(policy, service)) + } +} + // ===== impl Retry ===== impl Retry { diff --git a/tower-service/src/lib.rs b/tower-service/src/lib.rs index f4e8c7c..af2b054 100644 --- a/tower-service/src/lib.rs +++ b/tower-service/src/lib.rs @@ -80,7 +80,7 @@ use futures::{Future, Poll}; /// println!("Redis response: {:?}", await(resp)); /// ``` /// -/// # Middleware +/// # Middleware / Layer /// /// More often than not, all the pieces needed for writing robust, scalable /// network applications are the same no matter the underlying protocol. By @@ -92,6 +92,7 @@ use futures::{Future, Poll}; /// /// ```rust,ignore /// use tower_service::Service; +/// use tower_layer::Layer; /// use futures::Future; /// use std::time::Duration; /// @@ -103,6 +104,8 @@ use futures::{Future, Poll}; /// timer: Timer, /// } /// +/// pub struct TimeoutLayer(Duration); +/// /// pub struct Expired; /// /// impl Timeout { @@ -140,6 +143,25 @@ use futures::{Future, Poll}; /// } /// } /// +/// impl TimeoutLayer { +/// pub fn new(delay: Duration) -> Self { +/// TimeoutLayer(delay) +/// } +/// } +/// +/// impl Layer for TimeoutLayer +/// where +/// S: Service, +/// { +/// type Response = S::Response; +/// type Error = S::Error; +/// type Service = Timeout; +/// +/// fn layer(&self, service: S) -> Timeout { +/// Timeout::new(service, self.0) +/// } +/// } +/// /// ``` /// /// The above timeout implementation is decoupled from the underlying protocol diff --git a/tower-timeout/Cargo.toml b/tower-timeout/Cargo.toml index 0829ae1..8d922d8 100644 --- a/tower-timeout/Cargo.toml +++ b/tower-timeout/Cargo.toml @@ -7,4 +7,5 @@ publish = false [dependencies] futures = "0.1" tower-service = { version = "0.2", path = "../tower-service" } +tower-layer = { version = "0.1", path = "../tower-layer" } tokio-timer = "0.2.6" diff --git a/tower-timeout/src/lib.rs b/tower-timeout/src/lib.rs index 68bd347..7df3aa8 100644 --- a/tower-timeout/src/lib.rs +++ b/tower-timeout/src/lib.rs @@ -9,10 +9,12 @@ extern crate futures; extern crate tokio_timer; +extern crate tower_layer; extern crate tower_service; use futures::{Async, Future, Poll}; use tokio_timer::{clock, Delay}; +use tower_layer::Layer; use tower_service::Service; use std::time::Duration; @@ -28,6 +30,12 @@ pub struct Timeout { timeout: Duration, } +/// Applies a timeout to requests via the supplied inner service. +#[derive(Debug)] +pub struct TimeoutLayer { + timeout: Duration, +} + /// `Timeout` response future #[derive(Debug)] pub struct ResponseFuture { @@ -35,6 +43,30 @@ pub struct ResponseFuture { sleep: Delay, } +// ===== impl TimeoutLayer ===== + +impl TimeoutLayer { + /// Create a timeout from a duration + pub fn new(timeout: Duration) -> Self { + TimeoutLayer { timeout } + } +} + +impl Layer for TimeoutLayer +where + S: Service, + S::Error: Into, +{ + type Response = S::Response; + type Error = Error; + type LayerError = (); + type Service = Timeout; + + fn layer(&self, service: S) -> Result { + Ok(Timeout::new(service, self.timeout)) + } +} + // ===== impl Timeout ===== impl Timeout {