Introduce tower-load-shed (#204)

Provides a middleware that immediately fails any request if the
underlying service isn't ready yet. This is useful when used in
conjunction with `InFlightLimit` or others, so that requests are
rejected immediately, instead of keeping track of extra pending
requests.
This commit is contained in:
Sean McArthur 2019-03-20 15:48:10 -07:00 committed by GitHub
parent 5607313192
commit db2f0ecfb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 288 additions and 1 deletions

View File

@ -7,8 +7,9 @@ members = [
"tower-discover",
"tower-filter",
"tower-in-flight-limit",
"tower-mock",
"tower-layer",
"tower-load-shed",
"tower-mock",
"tower-rate-limit",
"tower-reconnect",
"tower-retry",

View File

@ -0,0 +1,14 @@
[package]
name = "tower-load-shed"
version = "0.1.0"
authors = ["Sean McArthur <sean@seanmonstar.com>"]
publish = false
[dependencies]
futures = "0.1.25"
tower-service = "0.2.0"
tower-layer = { version = "0.1.0", path = "../tower-layer" }
[dev-dependencies]
tokio-mock-task = "0.1.1"
tower-mock = { version = "0.1", path = "../tower-mock" }

View File

@ -0,0 +1,4 @@
Tower Load Shed
A Tower middleware rejects requests immediately if the underlying service is
not ready, known as load-shedding.

View File

@ -0,0 +1,48 @@
//! Error types
use std::fmt;
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
pub(crate) use self::never::Never;
/// An error returned by `Overload` when the underlying service
/// is not ready to handle any requests at the time of being
/// called.
pub struct Overloaded {
_p: (),
}
impl Overloaded {
pub(crate) fn new() -> Self {
Overloaded { _p: () }
}
}
impl fmt::Debug for Overloaded {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("Overloaded")
}
}
impl fmt::Display for Overloaded {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("service overloaded")
}
}
impl std::error::Error for Overloaded {}
pub(crate) mod never {
use std::{error, fmt};
#[derive(Debug)]
pub enum Never {}
impl fmt::Display for Never {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
match *self {}
}
}
impl error::Error for Never {}
}

View File

@ -0,0 +1,46 @@
use std::fmt;
use futures::{Future, Poll};
use error::{Error, Overloaded};
/// Future for the `LoadShed` service.
pub struct ResponseFuture<F> {
state: Result<F, ()>,
}
impl<F> ResponseFuture<F> {
pub(crate) fn called(fut: F) -> Self {
ResponseFuture { state: Ok(fut) }
}
pub(crate) fn overloaded() -> Self {
ResponseFuture { state: Err(()) }
}
}
impl<F> Future for ResponseFuture<F>
where
F: Future,
F::Error: Into<Error>,
{
type Item = F::Item;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.state {
Ok(ref mut fut) => fut.poll().map_err(Into::into),
Err(()) => Err(Overloaded::new().into()),
}
}
}
impl<F> fmt::Debug for ResponseFuture<F>
where
// bounds for future-proofing...
F: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("ResponseFuture")
}
}

View File

@ -0,0 +1,33 @@
use tower_layer::Layer;
use tower_service::Service;
use error::{Error, Never};
use LoadShed;
/// A `tower-layer` to wrap services in `LoadShed` middleware.
#[derive(Debug)]
pub struct LoadShedLayer {
_p: (),
}
impl LoadShedLayer {
/// Creates a new layer.
pub fn new() -> Self {
LoadShedLayer { _p: () }
}
}
impl<S, Req> Layer<S, Req> for LoadShedLayer
where
S: Service<Req>,
S::Error: Into<Error>,
{
type Response = S::Response;
type Error = Error;
type LayerError = Never;
type Service = LoadShed<S>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
Ok(LoadShed::new(service))
}
}

View File

@ -0,0 +1,80 @@
#![cfg_attr(test, deny(warnings))]
#![deny(missing_debug_implementations)]
#![deny(missing_docs)]
//! tower-load-shed
extern crate futures;
extern crate tower_layer;
extern crate tower_service;
use futures::Poll;
use tower_service::Service;
pub mod error;
mod future;
mod layer;
use self::error::Error;
pub use self::future::ResponseFuture;
pub use self::layer::LoadShedLayer;
/// A `Service` that sheds load when the inner service isn't ready.
#[derive(Debug)]
pub struct LoadShed<S> {
inner: S,
is_ready: bool,
}
// ===== impl LoadShed =====
impl<S> LoadShed<S> {
/// Wraps a service in `LoadShed` middleware.
pub fn new(inner: S) -> Self {
LoadShed {
inner,
is_ready: false,
}
}
}
impl<S, Req> Service<Req> for LoadShed<S>
where
S: Service<Req>,
S::Error: Into<Error>,
{
type Response = S::Response;
type Error = Error;
type Future = ResponseFuture<S::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// We check for readiness here, so that we can know in `call` if
// the inner service is overloaded or not.
self.is_ready = self.inner.poll_ready().map_err(Into::into)?.is_ready();
// But we always report Ready, so that layers above don't wait until
// the inner service is ready (the entire point of this layer!)
Ok(().into())
}
fn call(&mut self, req: Req) -> Self::Future {
if self.is_ready {
// readiness only counts once, you need to check again!
self.is_ready = false;
ResponseFuture::called(self.inner.call(req))
} else {
ResponseFuture::overloaded()
}
}
}
impl<S: Clone> Clone for LoadShed<S> {
fn clone(&self) -> Self {
LoadShed {
inner: self.inner.clone(),
// new clones shouldn't carry the readiness state, as a cloneable
// inner service likely tracks readiness per clone.
is_ready: false,
}
}
}

View File

@ -0,0 +1,61 @@
extern crate futures;
extern crate tower_load_shed;
extern crate tower_mock;
extern crate tower_service;
use futures::Future;
use tower_load_shed::LoadShed;
use tower_service::Service;
#[test]
fn when_ready() {
let (mut service, mut handle) = new_service();
with_task(|| {
assert!(
service.poll_ready().unwrap().is_ready(),
"overload always reports ready",
);
});
let response = service.call("hello");
let request = handle.next_request().unwrap();
assert_eq!(*request, "hello");
request.respond("world");
assert_eq!(response.wait().unwrap(), "world");
}
#[test]
fn when_not_ready() {
let (mut service, mut handle) = new_service();
handle.allow(0);
with_task(|| {
assert!(
service.poll_ready().unwrap().is_ready(),
"overload always reports ready",
);
});
let fut = service.call("hello");
let err = fut.wait().unwrap_err();
assert!(err.is::<tower_load_shed::error::Overloaded>());
}
type Mock = tower_mock::Mock<&'static str, &'static str>;
type Handle = tower_mock::Handle<&'static str, &'static str>;
fn new_service() -> (LoadShed<Mock>, Handle) {
let (service, handle) = Mock::new();
let service = LoadShed::new(service);
(service, handle)
}
fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
use futures::future::{lazy, Future};
lazy(|| Ok::<_, ()>(f())).wait().unwrap()
}