Introduce the `ServiceBuilder` (#175)
The `ServiceBuilder` composes layers together and produces either a `MakeService` or a `Service` wrapped by those layers.
This commit is contained in:
parent
bec3937e87
commit
476f085c89
|
@ -17,3 +17,6 @@ members = [
|
|||
"tower-service-util",
|
||||
"tower-timeout",
|
||||
]
|
||||
|
||||
[patch.'https://github.com/tower-rs/tower']
|
||||
tower-retry = { path = "tower-retry" }
|
|
@ -1,6 +1,6 @@
|
|||
use tower_layer::Layer;
|
||||
use tower_service::Service;
|
||||
use {Error, InFlightLimit};
|
||||
use {Error, InFlightLimit, Never};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct InFlightLimitLayer {
|
||||
|
@ -20,7 +20,7 @@ where
|
|||
{
|
||||
type Response = S::Response;
|
||||
type Error = Error;
|
||||
type LayerError = ();
|
||||
type LayerError = Never;
|
||||
type Service = InFlightLimit<S>;
|
||||
|
||||
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
|
||||
|
|
|
@ -9,9 +9,11 @@ extern crate tower_service;
|
|||
|
||||
pub mod future;
|
||||
mod layer;
|
||||
mod never;
|
||||
|
||||
use future::ResponseFuture;
|
||||
pub use layer::InFlightLimitLayer;
|
||||
use never::Never;
|
||||
|
||||
use tower_service::Service;
|
||||
|
||||
|
@ -31,7 +33,7 @@ struct Limit {
|
|||
permit: semaphore::Permit,
|
||||
}
|
||||
|
||||
type Error = Box<::std::error::Error + Send + Sync>;
|
||||
type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||
|
||||
// ===== impl InFlightLimit =====
|
||||
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
use std::fmt;
|
||||
#[derive(Debug)]
|
||||
/// An error that can never occur.
|
||||
pub enum Never {}
|
||||
|
||||
impl fmt::Display for Never {
|
||||
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for Never {}
|
|
@ -20,6 +20,9 @@ categories = ["asynchronous", "network-programming"]
|
|||
futures = "0.1"
|
||||
tower-service = "0.2.0"
|
||||
|
||||
[dev-dependencies]
|
||||
void = "1"
|
||||
|
||||
[features]
|
||||
default = ["util"]
|
||||
util = []
|
||||
|
|
|
@ -34,10 +34,12 @@ use tower_service::Service;
|
|||
/// ```rust
|
||||
/// # extern crate futures;
|
||||
/// # extern crate tower_service;
|
||||
/// # extern crate void;
|
||||
/// # use tower_service::Service;
|
||||
/// # use futures::{Poll, Async};
|
||||
/// # use tower_layer::Layer;
|
||||
/// # use std::fmt;
|
||||
/// # use void::Void;
|
||||
///
|
||||
/// pub struct LogLayer {
|
||||
/// target: &'static str,
|
||||
|
@ -50,7 +52,7 @@ use tower_service::Service;
|
|||
/// {
|
||||
/// type Response = S::Response;
|
||||
/// type Error = S::Error;
|
||||
/// type LayerError = ();
|
||||
/// type LayerError = Void;
|
||||
/// type Service = LogService<S>;
|
||||
///
|
||||
/// fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
|
||||
|
|
|
@ -10,13 +10,7 @@ pub struct Chain<Inner, Outer> {
|
|||
outer: Outer,
|
||||
}
|
||||
|
||||
/// Error's produced when chaining two layers together
|
||||
pub enum ChainError<I, O> {
|
||||
/// Error produced from the inner layer call
|
||||
Inner(I),
|
||||
/// Error produced from the outer layer call
|
||||
Outer(O),
|
||||
}
|
||||
type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||
|
||||
impl<Inner, Outer> Chain<Inner, Outer> {
|
||||
/// Create a new `Chain`.
|
||||
|
@ -29,19 +23,18 @@ impl<S, Request, Inner, Outer> Layer<S, Request> for Chain<Inner, Outer>
|
|||
where
|
||||
S: Service<Request>,
|
||||
Inner: Layer<S, Request>,
|
||||
Inner::LayerError: Into<Error>,
|
||||
Outer: Layer<Inner::Service, Request>,
|
||||
Outer::LayerError: Into<Error>,
|
||||
{
|
||||
type Response = Outer::Response;
|
||||
type Error = Outer::Error;
|
||||
type LayerError = ChainError<Inner::LayerError, Outer::LayerError>;
|
||||
type LayerError = Error;
|
||||
type Service = Outer::Service;
|
||||
|
||||
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
|
||||
let inner = self
|
||||
.inner
|
||||
.layer(service)
|
||||
.map_err(|e| ChainError::Inner(e))?;
|
||||
let inner = self.inner.layer(service).map_err(Into::into)?;
|
||||
|
||||
self.outer.layer(inner).map_err(|e| ChainError::Outer(e))
|
||||
self.outer.layer(inner).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use tower_service::Service;
|
||||
use Layer;
|
||||
|
|
|
@ -5,7 +5,7 @@ use Layer;
|
|||
mod chain;
|
||||
mod identity;
|
||||
|
||||
pub use self::chain::{Chain, ChainError};
|
||||
pub use self::chain::Chain;
|
||||
pub use self::identity::Identity;
|
||||
|
||||
/// An extension trait for `Layer`'s that provides a variety of convenient
|
||||
|
@ -17,7 +17,7 @@ pub trait LayerExt<S, Request>: Layer<S, Request> {
|
|||
/// This defines a middleware stack.
|
||||
fn chain<T>(self, middleware: T) -> Chain<Self, T>
|
||||
where
|
||||
T: Layer<S, Request>,
|
||||
T: Layer<Self::Service, Request>,
|
||||
Self: Sized,
|
||||
{
|
||||
Chain::new(self, middleware)
|
||||
|
|
|
@ -15,6 +15,9 @@ use tower_layer::Layer;
|
|||
use tower_service::Service;
|
||||
|
||||
pub mod budget;
|
||||
mod never;
|
||||
|
||||
use never::Never;
|
||||
|
||||
/// A "retry policy" to classify if a request should be retried.
|
||||
///
|
||||
|
@ -135,7 +138,7 @@ where
|
|||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type LayerError = ();
|
||||
type LayerError = Never;
|
||||
type Service = Retry<P, S>;
|
||||
|
||||
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
use std::fmt;
|
||||
#[derive(Debug)]
|
||||
/// An error that can never occur.
|
||||
pub enum Never {}
|
||||
|
||||
impl fmt::Display for Never {
|
||||
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for Never {}
|
|
@ -1,8 +1,8 @@
|
|||
use crate::{Error, Timeout};
|
||||
use never::Never;
|
||||
use std::time::Duration;
|
||||
use tower_layer::Layer;
|
||||
use tower_service::Service;
|
||||
|
||||
/// Applies a timeout to requests via the supplied inner service.
|
||||
#[derive(Debug)]
|
||||
pub struct TimeoutLayer {
|
||||
|
@ -23,7 +23,7 @@ where
|
|||
{
|
||||
type Response = S::Response;
|
||||
type Error = Error;
|
||||
type LayerError = ();
|
||||
type LayerError = Never;
|
||||
type Service = Timeout<S>;
|
||||
|
||||
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
|
||||
|
|
|
@ -15,6 +15,7 @@ extern crate tower_service;
|
|||
pub mod error;
|
||||
pub mod future;
|
||||
mod layer;
|
||||
mod never;
|
||||
|
||||
pub use crate::layer::TimeoutLayer;
|
||||
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
use std::fmt;
|
||||
#[derive(Debug)]
|
||||
/// An error that can never occur.
|
||||
pub enum Never {}
|
||||
|
||||
impl fmt::Display for Never {
|
||||
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for Never {}
|
|
@ -16,13 +16,25 @@ keywords = ["io", "async", "non-blocking", "futures", "service"]
|
|||
[dependencies]
|
||||
futures = "0.1"
|
||||
tower-service = "0.2.0"
|
||||
tower-layer = { version = "0.1", path = "../tower-layer" }
|
||||
tower-service-util = { version = "0.1.0", path = "../tower-service-util", features = ["io", "either"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tower-service = "0.2.0"
|
||||
futures = "0.1"
|
||||
tower-service = "0.2"
|
||||
tower-in-flight-limit = { version = "0.1", path = "../tower-in-flight-limit" }
|
||||
tower-rate-limit = { version = "0.1", path = "../tower-rate-limit" }
|
||||
tower-reconnect = { version = "0.1", path = "../tower-reconnect" }
|
||||
tower-retry = { version = "0.1", path = "../tower-retry" }
|
||||
tower-buffer = { version = "0.1", path = "../tower-buffer" }
|
||||
tower-hyper = { git = "https://github.com/tower-rs/tower-hyper" }
|
||||
tower-layer = { version = "0.1", path = "../tower-layer" }
|
||||
tokio-tcp = "0.1"
|
||||
hyper = "0.12"
|
||||
log = "0.4.1"
|
||||
tokio = "0.1"
|
||||
env_logger = { version = "0.5.3", default-features = false }
|
||||
tokio-timer = "0.1"
|
||||
futures-cpupool = "0.1"
|
||||
tokio-mock-task = "0.1"
|
||||
void = "1"
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
extern crate futures;
|
||||
extern crate hyper;
|
||||
extern crate tower;
|
||||
extern crate tower_buffer;
|
||||
extern crate tower_hyper;
|
||||
extern crate tower_in_flight_limit;
|
||||
extern crate tower_rate_limit;
|
||||
extern crate tower_reconnect;
|
||||
extern crate tower_retry;
|
||||
extern crate tower_service;
|
||||
|
||||
use futures::Future;
|
||||
use hyper::client::connect::Destination;
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::{Request, Response, Uri};
|
||||
use std::time::Duration;
|
||||
use tower::builder::ServiceBuilder;
|
||||
use tower::ServiceExt;
|
||||
use tower_buffer::BufferLayer;
|
||||
use tower_hyper::client::{Builder, Connect};
|
||||
use tower_hyper::retry::{Body, RetryPolicy};
|
||||
use tower_hyper::util::Connector;
|
||||
use tower_in_flight_limit::InFlightLimitLayer;
|
||||
use tower_rate_limit::RateLimitLayer;
|
||||
use tower_reconnect::Reconnect;
|
||||
use tower_retry::RetryLayer;
|
||||
use tower_service::Service;
|
||||
|
||||
fn main() {
|
||||
let fut = futures::lazy(|| {
|
||||
request().map(|resp| {
|
||||
dbg!(resp);
|
||||
})
|
||||
});
|
||||
hyper::rt::run(fut)
|
||||
}
|
||||
|
||||
fn request() -> impl Future<Item = Response<hyper::Body>, Error = ()> {
|
||||
let connector = Connector::new(HttpConnector::new(1));
|
||||
let hyper = Connect::new(connector, Builder::new());
|
||||
|
||||
// RetryPolicy is a very simple policy that retries `n` times
|
||||
// if the response has a 500 status code. Here, `n` is 5.
|
||||
let policy = RetryPolicy::new(5);
|
||||
// We're calling the tower/examples/server.rs.
|
||||
let dst = Destination::try_from_uri(Uri::from_static("http://127.0.0.1:3000")).unwrap();
|
||||
|
||||
// Now, to build the service! We use two BufferLayers in order to:
|
||||
// - provide backpressure for the RateLimitLayer, RateLimitLayer, and InFlightLimitLayer
|
||||
// - meet `RetryLayer`'s requirement that our service implement `Service + Clone`
|
||||
// - ..and to provide cheap clones on the service.
|
||||
let maker = ServiceBuilder::new()
|
||||
.layer(BufferLayer::new(5))
|
||||
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
|
||||
.layer(InFlightLimitLayer::new(5))
|
||||
.layer(RetryLayer::new(policy))
|
||||
.layer(BufferLayer::new(5))
|
||||
.build_make_service(hyper);
|
||||
|
||||
// `Reconnect` accepts a destination and a MakeService, creating a new service
|
||||
// any time the connection encounters an error.
|
||||
let client = Reconnect::new(maker, dst);
|
||||
|
||||
let request = Request::builder()
|
||||
.method("GET")
|
||||
.body(Body::from(Vec::new()))
|
||||
.unwrap();
|
||||
|
||||
// we check to see if the client is ready to accept requests.
|
||||
client
|
||||
.ready()
|
||||
.map_err(|e| panic!("Service is not ready: {:?}", e))
|
||||
.and_then(|mut c| c.call(request).map_err(|e| panic!("{:?}", e)))
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
extern crate futures;
|
||||
extern crate hyper;
|
||||
extern crate tokio_tcp;
|
||||
extern crate tower;
|
||||
extern crate tower_hyper;
|
||||
extern crate tower_in_flight_limit;
|
||||
extern crate tower_service;
|
||||
|
||||
use futures::{future, Future, Poll, Stream};
|
||||
use hyper::{Body, Request, Response};
|
||||
use tokio_tcp::TcpListener;
|
||||
use tower::builder::ServiceBuilder;
|
||||
use tower_hyper::body::LiftBody;
|
||||
use tower_hyper::server::Server;
|
||||
use tower_in_flight_limit::InFlightLimitLayer;
|
||||
use tower_service::Service;
|
||||
|
||||
fn main() {
|
||||
hyper::rt::run(future::lazy(|| {
|
||||
let addr = "127.0.0.1:3000".parse().unwrap();
|
||||
let bind = TcpListener::bind(&addr).expect("bind");
|
||||
|
||||
println!("Listening on http://{}", addr);
|
||||
|
||||
let maker = ServiceBuilder::new()
|
||||
.layer(InFlightLimitLayer::new(5))
|
||||
.build_make_service(MakeSvc);
|
||||
|
||||
let server = Server::new(maker);
|
||||
|
||||
bind.incoming()
|
||||
.fold(server, |mut server, stream| {
|
||||
if let Err(e) = stream.set_nodelay(true) {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
hyper::rt::spawn(
|
||||
server
|
||||
.serve(stream)
|
||||
.map_err(|e| panic!("Server error {:?}", e)),
|
||||
);
|
||||
|
||||
Ok(server)
|
||||
})
|
||||
.map_err(|e| panic!("serve error: {:?}", e))
|
||||
.map(|_| {})
|
||||
}));
|
||||
}
|
||||
|
||||
struct Svc;
|
||||
impl Service<Request<Body>> for Svc {
|
||||
type Response = Response<LiftBody<Body>>;
|
||||
type Error = hyper::Error;
|
||||
type Future = future::FutureResult<Self::Response, Self::Error>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(().into())
|
||||
}
|
||||
|
||||
fn call(&mut self, _req: Request<Body>) -> Self::Future {
|
||||
let res = Response::new(LiftBody::new(Body::from("Hello World!")));
|
||||
future::ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
struct MakeSvc;
|
||||
impl Service<()> for MakeSvc {
|
||||
type Response = Svc;
|
||||
type Error = hyper::Error;
|
||||
type Future = Box<Future<Item = Self::Response, Error = Self::Error> + Send + 'static>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(().into())
|
||||
}
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
Box::new(future::ok(Svc))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,193 @@
|
|||
//! Builder types to compose layers and services
|
||||
|
||||
mod service;
|
||||
|
||||
pub use self::service::{LayeredMakeService, ServiceFuture};
|
||||
|
||||
use tower_layer::{
|
||||
util::{Chain, Identity},
|
||||
Layer,
|
||||
};
|
||||
use tower_service::Service;
|
||||
use tower_service_util::MakeService;
|
||||
|
||||
pub(super) type Error = Box<::std::error::Error + Send + Sync>;
|
||||
|
||||
/// `ServiceBuilder` provides a [builder-like interface](https://doc.rust-lang.org/1.0.0/style/ownership/builders.html) for composing Layers and a connection, where the latter is modeled by
|
||||
/// a `MakeService`. The builder produces either a new `Service` or `MakeService`,
|
||||
/// depending on whether `build_service` or `build_maker` is called.
|
||||
///
|
||||
/// # Services and MakeServices
|
||||
///
|
||||
/// - A [`Service`](tower_service::Service) is a trait representing an asynchronous
|
||||
/// function of a request to a response. It is similar to
|
||||
/// `async fn(Request) -> Result<Response, Error>`.
|
||||
/// - A [`MakeService`](tower_service_util::MakeService) is a trait creating specific
|
||||
/// instances of a `Service`
|
||||
///
|
||||
/// # Service
|
||||
///
|
||||
/// A `Service` is typically bound to a single transport, such as a TCP connection.
|
||||
/// It defines how _all_ inbound or outbound requests are handled by that connection.
|
||||
///
|
||||
/// # MakeService
|
||||
///
|
||||
/// Since a `Service` is bound to a single connection, a `MakeService` allows for the
|
||||
/// creation of _new_ `Service`s that'll be bound to _different_ different connections.
|
||||
/// This is useful for servers, as they require the ability to accept new connections.
|
||||
///
|
||||
/// Resources that need to be shared by all `Service`s can be put into a
|
||||
/// `MakeService`, and then passed to individual `Service`s when `build_maker`
|
||||
/// is called.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// A MakeService stack with a single layer:
|
||||
///
|
||||
/// ```rust
|
||||
/// # extern crate tower;
|
||||
/// # extern crate tower_in_flight_limit;
|
||||
/// # extern crate futures;
|
||||
/// # extern crate void;
|
||||
/// # use void::Void;
|
||||
/// # use tower::Service;
|
||||
/// # use tower::builder::ServiceBuilder;
|
||||
/// # use tower_in_flight_limit::InFlightLimitLayer;
|
||||
/// # use futures::{Poll, future::{self, FutureResult}};
|
||||
/// # #[derive(Debug)]
|
||||
/// # struct MyMakeService;
|
||||
/// # impl Service<()> for MyMakeService {
|
||||
/// # type Response = MyService;
|
||||
/// # type Error = Void;
|
||||
/// # type Future = FutureResult<Self::Response, Self::Error>;
|
||||
/// # fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
/// # Ok(().into())
|
||||
/// # }
|
||||
/// # fn call(&mut self, _: ()) -> Self::Future {
|
||||
/// # future::ok(MyService)
|
||||
/// # }
|
||||
/// # }
|
||||
/// # #[derive(Debug)]
|
||||
/// # struct MyService;
|
||||
/// # impl Service<()> for MyService {
|
||||
/// # type Response = ();
|
||||
/// # type Error = Void;
|
||||
/// # type Future = FutureResult<Self::Response, Self::Error>;
|
||||
/// # fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
/// # Ok(().into())
|
||||
/// # }
|
||||
/// # fn call(&mut self, _: ()) -> Self::Future {
|
||||
/// # future::ok(())
|
||||
/// # }
|
||||
/// # }
|
||||
/// ServiceBuilder::new()
|
||||
/// .layer(InFlightLimitLayer::new(5))
|
||||
/// .build_make_service(MyMakeService);
|
||||
/// ```
|
||||
///
|
||||
/// A `Service` stack with a single layer:
|
||||
///
|
||||
/// ```
|
||||
/// # extern crate tower;
|
||||
/// # extern crate tower_in_flight_limit;
|
||||
/// # extern crate futures;
|
||||
/// # extern crate void;
|
||||
/// # use void::Void;
|
||||
/// # use tower::Service;
|
||||
/// # use tower::builder::ServiceBuilder;
|
||||
/// # use tower_in_flight_limit::InFlightLimitLayer;
|
||||
/// # use futures::{Poll, future::{self, FutureResult}};
|
||||
/// # #[derive(Debug)]
|
||||
/// # struct MyService;
|
||||
/// # impl Service<()> for MyService {
|
||||
/// # type Response = ();
|
||||
/// # type Error = Void;
|
||||
/// # type Future = FutureResult<Self::Response, Self::Error>;
|
||||
/// # fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
/// # Ok(().into())
|
||||
/// # }
|
||||
/// # fn call(&mut self, _: ()) -> Self::Future {
|
||||
/// # future::ok(())
|
||||
/// # }
|
||||
/// # }
|
||||
/// ServiceBuilder::new()
|
||||
/// .layer(InFlightLimitLayer::new(5))
|
||||
/// .build_service(MyService);
|
||||
/// ```
|
||||
///
|
||||
/// A `Service` stack with _multiple_ layers that contain rate limiting, in-flight request limits,
|
||||
/// and a channel-backed, clonable `Service`:
|
||||
///
|
||||
/// ```
|
||||
/// # extern crate tower;
|
||||
/// # extern crate tower_in_flight_limit;
|
||||
/// # extern crate tower_buffer;
|
||||
/// # extern crate tower_rate_limit;
|
||||
/// # extern crate futures;
|
||||
/// # extern crate void;
|
||||
/// # use void::Void;
|
||||
/// # use tower::Service;
|
||||
/// # use tower::builder::ServiceBuilder;
|
||||
/// # use tower_in_flight_limit::InFlightLimitLayer;
|
||||
/// # use tower_buffer::BufferLayer;
|
||||
/// # use tower_rate_limit::RateLimitLayer;
|
||||
/// # use std::time::Duration;
|
||||
/// # use futures::{Poll, future::{self, FutureResult}};
|
||||
/// # #[derive(Debug)]
|
||||
/// # struct MyService;
|
||||
/// # impl Service<()> for MyService {
|
||||
/// # type Response = ();
|
||||
/// # type Error = Void;
|
||||
/// # type Future = FutureResult<Self::Response, Self::Error>;
|
||||
/// # fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
/// # Ok(().into())
|
||||
/// # }
|
||||
/// # fn call(&mut self, _: ()) -> Self::Future {
|
||||
/// # future::ok(())
|
||||
/// # }
|
||||
/// # }
|
||||
/// ServiceBuilder::new()
|
||||
/// .layer(BufferLayer::new(5))
|
||||
/// .layer(InFlightLimitLayer::new(5))
|
||||
/// .layer(RateLimitLayer::new(5, Duration::from_secs(1)))
|
||||
/// .build_service(MyService);
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct ServiceBuilder<L> {
|
||||
layer: L,
|
||||
}
|
||||
|
||||
impl ServiceBuilder<Identity> {
|
||||
/// Create a new `ServiceBuilder` from a `MakeService`.
|
||||
pub fn new() -> Self {
|
||||
ServiceBuilder {
|
||||
layer: Identity::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L> ServiceBuilder<L> {
|
||||
/// Layer a new layer `T` onto the `ServiceBuilder`.
|
||||
pub fn layer<T>(self, layer: T) -> ServiceBuilder<Chain<T, L>> {
|
||||
ServiceBuilder {
|
||||
layer: Chain::new(layer, self.layer),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a `LayeredMakeService` from the composed layers and transport `MakeService`.
|
||||
pub fn build_make_service<M, Target, Request>(self, mk: M) -> LayeredMakeService<M, L, Request>
|
||||
where
|
||||
M: MakeService<Target, Request>,
|
||||
{
|
||||
LayeredMakeService::new(mk, self.layer)
|
||||
}
|
||||
|
||||
/// Wrap the service `S` with the layers.
|
||||
pub fn build_service<S, Request>(self, service: S) -> Result<L::Service, L::LayerError>
|
||||
where
|
||||
L: Layer<S, Request>,
|
||||
S: Service<Request>,
|
||||
{
|
||||
self.layer.layer(service)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
use super::Error;
|
||||
use futures::{Async, Future, Poll};
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use tower_layer::Layer;
|
||||
use tower_service_util::MakeService;
|
||||
use Service;
|
||||
|
||||
/// Composed `MakeService` produced from `ServiceBuilder`
|
||||
#[derive(Debug)]
|
||||
pub struct LayeredMakeService<S, L, Request> {
|
||||
maker: S,
|
||||
layer: Arc<L>,
|
||||
_pd: PhantomData<Request>,
|
||||
}
|
||||
|
||||
/// Async resolve the MakeService and wrap it with the layers
|
||||
#[derive(Debug)]
|
||||
pub struct ServiceFuture<S, L, Target, Request>
|
||||
where
|
||||
S: MakeService<Target, Request>,
|
||||
{
|
||||
inner: S::Future,
|
||||
layer: Arc<L>,
|
||||
}
|
||||
|
||||
impl<S, L, Request> LayeredMakeService<S, L, Request> {
|
||||
pub(crate) fn new(maker: S, layer: L) -> Self {
|
||||
LayeredMakeService {
|
||||
maker,
|
||||
layer: Arc::new(layer),
|
||||
_pd: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, L, Target, Request> Service<Target> for LayeredMakeService<S, L, Request>
|
||||
where
|
||||
S: MakeService<Target, Request>,
|
||||
S::MakeError: Into<Error>,
|
||||
L: Layer<S::Service, Request> + Sync + Send + 'static,
|
||||
L::LayerError: Into<Error>,
|
||||
Target: Clone,
|
||||
{
|
||||
type Response = L::Service;
|
||||
type Error = Error;
|
||||
type Future = ServiceFuture<S, L, Target, Request>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.maker.poll_ready().map_err(Into::into)
|
||||
}
|
||||
|
||||
fn call(&mut self, target: Target) -> Self::Future {
|
||||
let inner = self.maker.make_service(target);
|
||||
let layer = Arc::clone(&self.layer);
|
||||
|
||||
ServiceFuture { inner, layer }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, L, Target, Request> Future for ServiceFuture<S, L, Target, Request>
|
||||
where
|
||||
S: MakeService<Target, Request>,
|
||||
S::MakeError: Into<Error>,
|
||||
L: Layer<S::Service, Request>,
|
||||
L::LayerError: Into<Error>,
|
||||
{
|
||||
type Item = L::Service;
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let service = try_ready!(self.inner.poll().map_err(Into::into));
|
||||
|
||||
match self.layer.layer(service) {
|
||||
Ok(service) => Ok(Async::Ready(service)),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,14 +1,17 @@
|
|||
//! Various utility types and functions that are generally with Tower.
|
||||
|
||||
pub mod builder;
|
||||
#[macro_use]
|
||||
extern crate futures;
|
||||
#[cfg(test)]
|
||||
extern crate tokio_mock_task;
|
||||
extern crate tower_layer;
|
||||
extern crate tower_service;
|
||||
extern crate tower_service_util;
|
||||
|
||||
pub mod util;
|
||||
|
||||
pub use builder::ServiceBuilder;
|
||||
pub use tower_service::Service;
|
||||
pub use tower_service_util::MakeConnection;
|
||||
pub use tower_service_util::MakeService;
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
extern crate futures;
|
||||
extern crate tokio;
|
||||
extern crate tower;
|
||||
extern crate tower_buffer;
|
||||
extern crate tower_in_flight_limit;
|
||||
extern crate tower_rate_limit;
|
||||
extern crate tower_reconnect;
|
||||
extern crate tower_retry;
|
||||
extern crate tower_service;
|
||||
extern crate void;
|
||||
|
||||
use futures::future::{self, FutureResult};
|
||||
use futures::prelude::*;
|
||||
use std::time::Duration;
|
||||
use tower::builder::ServiceBuilder;
|
||||
use tower_buffer::BufferLayer;
|
||||
use tower_in_flight_limit::InFlightLimitLayer;
|
||||
use tower_rate_limit::RateLimitLayer;
|
||||
use tower_reconnect::Reconnect;
|
||||
use tower_retry::{Policy, RetryLayer};
|
||||
use tower_service::*;
|
||||
use void::Void;
|
||||
|
||||
#[test]
|
||||
fn builder_make_service() {
|
||||
tokio::run(future::lazy(|| {
|
||||
let maker = ServiceBuilder::new()
|
||||
.layer(BufferLayer::new(5))
|
||||
.layer(InFlightLimitLayer::new(5))
|
||||
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
|
||||
.build_make_service(MockMaker);
|
||||
|
||||
let mut client = Reconnect::new(maker, ());
|
||||
|
||||
client.poll_ready().unwrap();
|
||||
client
|
||||
.call(Request)
|
||||
.map(|_| ())
|
||||
.map_err(|_| panic!("this is bad"))
|
||||
}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn builder_service() {
|
||||
tokio::run(future::lazy(|| {
|
||||
let mut client = ServiceBuilder::new()
|
||||
.layer(BufferLayer::new(5))
|
||||
.layer(InFlightLimitLayer::new(5))
|
||||
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
|
||||
.build_service(MockSvc)
|
||||
.unwrap();
|
||||
|
||||
client.poll_ready().unwrap();
|
||||
client
|
||||
.call(Request)
|
||||
.map(|_| ())
|
||||
.map_err(|_| panic!("this is bad"))
|
||||
}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn builder_make_service_retry() {
|
||||
tokio::run(future::lazy(|| {
|
||||
let policy = MockPolicy;
|
||||
|
||||
let maker = ServiceBuilder::new()
|
||||
.layer(BufferLayer::new(5))
|
||||
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
|
||||
.layer(InFlightLimitLayer::new(5))
|
||||
.layer(RetryLayer::new(policy))
|
||||
.layer(BufferLayer::new(5))
|
||||
.build_make_service(MockMaker);
|
||||
|
||||
let mut client = Reconnect::new(maker, ());
|
||||
|
||||
client.poll_ready().unwrap();
|
||||
client
|
||||
.call(Request)
|
||||
.map(|_| ())
|
||||
.map_err(|_| panic!("this is bad"))
|
||||
}));
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MockMaker;
|
||||
impl Service<()> for MockMaker {
|
||||
type Response = MockSvc;
|
||||
type Error = Void;
|
||||
type Future = FutureResult<Self::Response, Self::Error>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(().into())
|
||||
}
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
future::ok(MockSvc)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Request;
|
||||
#[derive(Debug, Clone)]
|
||||
struct Response;
|
||||
#[derive(Debug)]
|
||||
struct MockSvc;
|
||||
impl Service<Request> for MockSvc {
|
||||
type Response = Response;
|
||||
type Error = Void;
|
||||
type Future = FutureResult<Self::Response, Self::Error>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(().into())
|
||||
}
|
||||
|
||||
fn call(&mut self, _: Request) -> Self::Future {
|
||||
future::ok(Response)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct MockPolicy;
|
||||
|
||||
impl<E> Policy<Request, Response, E> for MockPolicy
|
||||
where
|
||||
E: Into<Box<std::error::Error + Send + Sync + 'static>>,
|
||||
{
|
||||
type Future = FutureResult<Self, ()>;
|
||||
|
||||
fn retry(&self, _req: &Request, _result: Result<&Response, &E>) -> Option<Self::Future> {
|
||||
None
|
||||
}
|
||||
|
||||
fn clone_request(&self, req: &Request) -> Option<Request> {
|
||||
Some(req.clone())
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue