layer: Add `tower-layer` and the `Layer` trait (#163)

This change introduces the new `tower-layer` crate and the foundational `Layer` trait to go along with it. This trait allows one to easily compose a set of `Service`s that take an inner service. These services only modify the request/response. This also provides the `Layer` implementation for many of the tower crates.
This commit is contained in:
Lucio Franco 2019-02-27 15:28:42 -05:00 committed by GitHub
parent 79a98ea05d
commit c5d70481bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 469 additions and 1 deletions

View File

@ -32,6 +32,7 @@ members = [
"tower-filter",
"tower-in-flight-limit",
"tower-mock",
"tower-layer",
"tower-rate-limit",
"tower-reconnect",
"tower-retry",

View File

@ -20,6 +20,9 @@ crates.
* [`tower-service`]: The foundational traits upon which Tower is built
([docs][ts-docs]).
* [`tower-layer`]: The foundational trait to compose services together
([docs][tl-docs]).
* [`tower-balance`]: A load balancer. Load is balanced across a number of
services ([docs][tb-docs]).
@ -73,6 +76,8 @@ terms or conditions.
[`tower-service`]: tower-service
[ts-docs]: https://docs.rs/tower-service/
[`tower-layer`]: tower-layer
[tl-docs]: https://docs.rs/tower-layer/
[`tower-balance`]: tower-balance
[tb-docs]: https://tower-rs.github.io/tower/tower_balance/index.html
[`tower-buffer`]: tower-buffer

View File

@ -7,6 +7,7 @@ publish = false
[dependencies]
futures = "0.1"
tower-service = { version = "0.2", path = "../tower-service" }
tower-layer = { version = "0.1", path = "../tower-layer" }
tokio-executor = "0.1"
tokio-sync = "0.1"

View File

@ -9,6 +9,7 @@
extern crate futures;
extern crate tokio_executor;
extern crate tokio_sync;
extern crate tower_layer;
extern crate tower_service;
pub mod error;
@ -27,6 +28,7 @@ use futures::Poll;
use tokio_executor::DefaultExecutor;
use tokio_sync::mpsc;
use tokio_sync::oneshot;
use tower_layer::Layer;
use tower_service::Service;
/// Adds a buffer in front of an inner service.
@ -40,6 +42,48 @@ where
worker: worker::Handle,
}
/// Buffer requests with a bounded buffer
pub struct BufferLayer<E = DefaultExecutor> {
bound: usize,
executor: E,
}
impl BufferLayer<DefaultExecutor> {
pub fn new(bound: usize) -> Self {
BufferLayer {
bound,
executor: DefaultExecutor::current(),
}
}
}
impl<E> BufferLayer<E> {
pub fn with_executor<S, Request>(bound: usize, executor: E) -> Self
where
S: Service<Request>,
S::Error: Into<Error>,
E: WorkerExecutor<S, Request>,
{
BufferLayer { bound, executor }
}
}
impl<E, S, Request> Layer<S, Request> for BufferLayer<E>
where
S: Service<Request>,
S::Error: Into<Error>,
E: WorkerExecutor<S, Request>,
{
type Response = S::Response;
type Error = Error;
type LayerError = SpawnError<S>;
type Service = Buffer<S, Request>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
Buffer::with_executor(service, self.bound, &self.executor)
}
}
impl<T, Request> Buffer<T, Request>
where
T: Service<Request>,

View File

@ -7,6 +7,7 @@ publish = false
[dependencies]
futures = "0.1"
tower-service = { version = "0.2", path = "../tower-service" }
tower-layer = { version = "0.1", path = "../tower-layer" }
[dev-dependencies]
tower-mock = { version = "0.1", path = "../tower-mock" }

View File

@ -2,10 +2,12 @@
//! a predicate.
extern crate futures;
extern crate tower_layer;
extern crate tower_service;
use futures::task::AtomicTask;
use futures::{Async, Future, IntoFuture, Poll};
use tower_layer::Layer;
use tower_service::Service;
use std::sync::atomic::AtomicUsize;
@ -21,6 +23,11 @@ pub struct Filter<T, U> {
counts: Arc<Counts>,
}
pub struct FilterLayer<U> {
predicate: U,
buffer: usize,
}
pub struct ResponseFuture<T, S, Request>
where
S: Service<Request>,
@ -76,6 +83,30 @@ enum State<Request, U> {
// ===== impl Filter =====
impl<U> FilterLayer<U> {
pub fn new(predicate: U, buffer: usize) -> Self {
FilterLayer { predicate, buffer }
}
}
impl<U, S, Request> Layer<S, Request> for FilterLayer<U>
where
U: Predicate<Request> + Clone,
S: Service<Request> + Clone,
{
type Response = S::Response;
type Error = Error<U::Error, S::Error>;
type LayerError = ();
type Service = Filter<S, U>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
let predicate = self.predicate.clone();
Ok(Filter::new(service, predicate, self.buffer))
}
}
// ===== impl Filter =====
impl<T, U> Filter<T, U> {
pub fn new<Request>(inner: T, predicate: U, buffer: usize) -> Self
where

View File

@ -7,6 +7,7 @@ publish = false
[dependencies]
futures = "0.1"
tower-service = { version = "0.2", path = "../tower-service" }
tower-layer = { version = "0.1", path = "../tower-layer" }
[dev-dependencies]
tokio-test = { git = "https://github.com/carllerche/tokio-test" }

View File

@ -2,8 +2,10 @@
//! service.
extern crate futures;
extern crate tower_layer;
extern crate tower_service;
use tower_layer::Layer;
use tower_service::Service;
use futures::task::AtomicTask;
@ -19,6 +21,11 @@ pub struct InFlightLimit<T> {
state: State,
}
#[derive(Debug, Clone)]
pub struct InFlightLimitLayer {
max: usize,
}
/// Error returned when the service has reached its limit.
#[derive(Debug)]
pub enum Error<T> {
@ -124,6 +131,28 @@ where
}
}
// ===== impl InFlightLimitLayer =====
impl InFlightLimitLayer {
pub fn new(max: usize) -> Self {
InFlightLimitLayer { max }
}
}
impl<S, Request> Layer<S, Request> for InFlightLimitLayer
where
S: Service<Request>,
{
type Response = S::Response;
type Error = Error<S::Error>;
type LayerError = ();
type Service = InFlightLimit<S>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
Ok(InFlightLimit::new(service, self.max))
}
}
// ===== impl ResponseFuture =====
impl<T> Future for ResponseFuture<T>

25
tower-layer/Cargo.toml Normal file
View File

@ -0,0 +1,25 @@
[package]
name = "tower-layer"
# When releasing to crates.io:
# - Update html_root_url.
# - Update documentation URL
# - Create "v0.x.y" git tag.
version = "0.1.0"
authors = ["Carl Lerche <me@carllerche.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tokio-layer/0.1.0"
description = """
Decorates a `Service` to allow easy composition between `Service`s.
"""
categories = ["asynchronous", "network-programming"]
[dependencies]
futures = "0.1"
tower-service = { version = "0.2", path = "../tower-service" }
[features]
default = ["ext"]
ext = []

25
tower-layer/LICENSE Normal file
View File

@ -0,0 +1,25 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

21
tower-layer/README.md Normal file
View File

@ -0,0 +1,21 @@
# Tower Layer
Decorates a `Service`, transforming either the request or the response.
## Overview
Often, many of the pieces needed for writing network applications can be
reused across multiple services. The `Layer` trait can be used to write
reusable components that can be applied to very different kinds of services;
for example, it can be applied to services operating on different protocols,
and to both the client and server side of a network transaction.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

110
tower-layer/src/lib.rs Normal file
View File

@ -0,0 +1,110 @@
//! Layer traits and extensions.
//!
//! A layer decorates an service and provides additional functionality. It
//! allows other services to be composed with the service that implements layer.
//!
//! A middleware implements the [`Layer`] and [`Service`] trait.
#![deny(missing_docs)]
#![doc(html_root_url = "https://docs.rs/tower-layer/0.1.0")]
extern crate futures;
extern crate tower_service;
#[cfg(feature = "util")]
pub mod util;
#[cfg(feature = "util")]
pub use util::LayerExt;
use tower_service::Service;
/// Decorates a `Service`, transforming either the request or the response.
///
/// Often, many of the pieces needed for writing network applications can be
/// reused across multiple services. The `Layer` trait can be used to write
/// reusable components that can be applied to very different kinds of services;
/// for example, it can be applied to services operating on different protocols,
/// and to both the client and server side of a network transaction.
///
/// # Log
///
/// Take request logging as an example:
///
/// ```rust
/// # extern crate futures;
/// # extern crate tower_service;
/// # use tower_service::Service;
/// # use futures::{Poll, Async};
/// # use tower_layer::Layer;
/// # use std::fmt;
///
/// pub struct LogLayer {
/// target: &'static str,
/// }
///
/// impl<S, Request> Layer<S, Request> for LogLayer
/// where
/// S: Service<Request>,
/// Request: fmt::Debug,
/// {
/// type Response = S::Response;
/// type Error = S::Error;
/// type LayerError = ();
/// type Service = LogService<S>;
///
/// fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
/// Ok(LogService {
/// target: self.target,
/// service
/// })
/// }
/// }
///
/// // This service implements the Log behavior
/// pub struct LogService<S> {
/// target: &'static str,
/// service: S,
/// }
///
/// impl<S, Request> Service<Request> for LogService<S>
/// where
/// S: Service<Request>,
/// Request: fmt::Debug,
/// {
/// type Response = S::Response;
/// type Error = S::Error;
/// type Future = S::Future;
///
/// fn poll_ready(&mut self) -> Poll<(), Self::Error> {
/// self.service.poll_ready()
/// }
///
/// fn call(&mut self, request: Request) -> Self::Future {
/// // Insert log statement here or other functionality
/// println!("request = {:?}, target = {:?}", request, self.target);
/// self.service.call(request)
/// }
/// }
/// ```
///
/// The above log implementation is decoupled from the underlying protocol and
/// is also decoupled from client or server concerns. In other words, the same
/// log middleware could be used in either a client or a server.
pub trait Layer<S, Request> {
/// The wrapped service response type
type Response;
/// The wrapped service's error type
type Error;
/// The error produced when calling `layer`
type LayerError;
/// The wrapped service
type Service: Service<Request, Response = Self::Response, Error = Self::Error>;
/// Wrap the given service with the middleware, returning a new service
/// that has been decorated with the middleware.
fn layer(&self, inner: S) -> Result<Self::Service, Self::LayerError>;
}

View File

@ -0,0 +1,33 @@
use tower_service::Service;
use Layer;
/// Two middlewares chained together.
///
/// This type is produced by `Layer::chain`.
#[derive(Debug)]
pub struct Chain<Inner, Outer> {
inner: Inner,
outer: Outer,
}
impl<Inner, Outer> Chain<Inner, Outer> {
/// Create a new `Chain`.
pub fn new(inner: Inner, outer: Outer) -> Self {
Chain { inner, outer }
}
}
impl<S, Request, Inner, Outer> Layer<S, Request> for Chain<Inner, Outer>
where
S: Service<Request>,
Inner: Layer<S, Request>,
Outer: Layer<Inner::Service, Request>,
{
type Response = Outer::Response;
type Error = Outer::Error;
type Service = Outer::Service;
fn layer(&self, service: S) -> Self::Service {
self.outer.wrap(self.inner.wrap(service))
}
}

View File

@ -0,0 +1,23 @@
use Layer;
mod chain;
pub use self::chain::Chain;
/// An extension trait for `Layer`'s that provides a variety of convenient
/// adapters.
pub trait LayerExt<S, Request>: Layer<S, Request> {
/// Return a new `Layer` instance that applies both `self` and
/// `middleware` to services being wrapped.
///
/// This defines a middleware stack.
fn chain<T>(self, middleware: T) -> Chain<Self, T>
where
T: Layer<S, Request>,
Self: Sized,
{
Chain::new(self, middleware)
}
}
impl<T, S, Request> LayerExt<S, Request> for T where T: Layer<S, Request> {}

View File

@ -7,6 +7,7 @@ publish = false
[dependencies]
futures = "0.1"
tower-service = { version = "0.2", path = "../tower-service" }
tower-layer = { version = "0.1", path = "../tower-layer"}
tokio-timer = "0.2.6"
[dev-dependencies]

View File

@ -4,10 +4,12 @@
#[macro_use]
extern crate futures;
extern crate tokio_timer;
extern crate tower_layer;
extern crate tower_service;
use futures::{Future, Poll};
use tokio_timer::Delay;
use tower_layer::Layer;
use tower_service::Service;
use std::time::{Duration, Instant};
@ -20,6 +22,11 @@ pub struct RateLimit<T> {
state: State,
}
#[derive(Debug)]
pub struct RateLimitLayer {
rate: Rate,
}
#[derive(Debug, Copy, Clone)]
pub struct Rate {
num: u64,
@ -46,6 +53,27 @@ enum State {
Ready { until: Instant, rem: u64 },
}
impl RateLimitLayer {
pub fn new(num: u64, per: Duration) -> Self {
let rate = Rate { num, per };
RateLimitLayer { rate }
}
}
impl<S, Request> Layer<S, Request> for RateLimitLayer
where
S: Service<Request>,
{
type Response = S::Response;
type Error = Error<S::Error>;
type LayerError = ();
type Service = RateLimit<S>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
Ok(RateLimit::new(service, self.rate))
}
}
impl<T> RateLimit<T> {
/// Create a new rate limiter
pub fn new<Request>(inner: T, rate: Rate) -> Self

View File

@ -7,6 +7,7 @@ publish = false
[dependencies]
futures = "0.1"
tower-service = { version = "0.2", path = "../tower-service" }
tower-layer = { version = "0.1", path = "../tower-layer" }
tokio-timer = "0.2.4"
[dev-dependencies]

View File

@ -7,9 +7,11 @@
#[macro_use]
extern crate futures;
extern crate tokio_timer;
extern crate tower_layer;
extern crate tower_service;
use futures::{Async, Future, Poll};
use tower_layer::Layer;
use tower_service::Service;
pub mod budget;
@ -89,6 +91,12 @@ pub struct Retry<P, S> {
service: S,
}
/// Retry requests based on a policy
#[derive(Debug)]
pub struct RetryLayer<P> {
policy: P,
}
/// The `Future` returned by a `Retry` service.
#[derive(Debug)]
pub struct ResponseFuture<P, S, Request>
@ -111,6 +119,31 @@ enum State<F, P, R, E> {
Retrying,
}
// ===== impl RetryLayer =====
impl<P> RetryLayer<P> {
/// Create a new `RetryLayer` from a retry policy
pub fn new(policy: P) -> Self {
RetryLayer { policy }
}
}
impl<P, S, Request> Layer<S, Request> for RetryLayer<P>
where
S: Service<Request> + Clone,
P: Policy<Request, S::Response, S::Error> + Clone,
{
type Response = S::Response;
type Error = S::Error;
type LayerError = ();
type Service = Retry<P, S>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
let policy = self.policy.clone();
Ok(Retry::new(policy, service))
}
}
// ===== impl Retry =====
impl<P, S> Retry<P, S> {

View File

@ -80,7 +80,7 @@ use futures::{Future, Poll};
/// println!("Redis response: {:?}", await(resp));
/// ```
///
/// # Middleware
/// # Middleware / Layer
///
/// More often than not, all the pieces needed for writing robust, scalable
/// network applications are the same no matter the underlying protocol. By
@ -92,6 +92,7 @@ use futures::{Future, Poll};
///
/// ```rust,ignore
/// use tower_service::Service;
/// use tower_layer::Layer;
/// use futures::Future;
/// use std::time::Duration;
///
@ -103,6 +104,8 @@ use futures::{Future, Poll};
/// timer: Timer,
/// }
///
/// pub struct TimeoutLayer(Duration);
///
/// pub struct Expired;
///
/// impl<T> Timeout<T> {
@ -140,6 +143,25 @@ use futures::{Future, Poll};
/// }
/// }
///
/// impl TimeoutLayer {
/// pub fn new(delay: Duration) -> Self {
/// TimeoutLayer(delay)
/// }
/// }
///
/// impl<S, Request> Layer<S, Request> for TimeoutLayer
/// where
/// S: Service<Request>,
/// {
/// type Response = S::Response;
/// type Error = S::Error;
/// type Service = Timeout<S>;
///
/// fn layer(&self, service: S) -> Timeout<S> {
/// Timeout::new(service, self.0)
/// }
/// }
///
/// ```
///
/// The above timeout implementation is decoupled from the underlying protocol

View File

@ -7,4 +7,5 @@ publish = false
[dependencies]
futures = "0.1"
tower-service = { version = "0.2", path = "../tower-service" }
tower-layer = { version = "0.1", path = "../tower-layer" }
tokio-timer = "0.2.6"

View File

@ -9,10 +9,12 @@
extern crate futures;
extern crate tokio_timer;
extern crate tower_layer;
extern crate tower_service;
use futures::{Async, Future, Poll};
use tokio_timer::{clock, Delay};
use tower_layer::Layer;
use tower_service::Service;
use std::time::Duration;
@ -28,6 +30,12 @@ pub struct Timeout<T> {
timeout: Duration,
}
/// Applies a timeout to requests via the supplied inner service.
#[derive(Debug)]
pub struct TimeoutLayer {
timeout: Duration,
}
/// `Timeout` response future
#[derive(Debug)]
pub struct ResponseFuture<T> {
@ -35,6 +43,30 @@ pub struct ResponseFuture<T> {
sleep: Delay,
}
// ===== impl TimeoutLayer =====
impl TimeoutLayer {
/// Create a timeout from a duration
pub fn new(timeout: Duration) -> Self {
TimeoutLayer { timeout }
}
}
impl<S, Request> Layer<S, Request> for TimeoutLayer
where
S: Service<Request>,
S::Error: Into<Error>,
{
type Response = S::Response;
type Error = Error;
type LayerError = ();
type Service = Timeout<S>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
Ok(Timeout::new(service, self.timeout))
}
}
// ===== impl Timeout =====
impl<T> Timeout<T> {