filter: Update error style (#178)

This patch does not migrate tower-filter to use an error type of
Box<Error>. Instead, it defines a new type that optionally contains
a Box<Error>.

The filter layer can be used as part of routing. The router would
sequentially attempt to dispatch a request to an inner service. If
the request is rejected, it attempts the next one. In this case, allocating
a Box<Error> for each attempt is not great.

This strategy still fits within the greater picture. tower_filter::error::Error
implements the error trait, which means that all other layers that take
T: Into<Error> will still work with Filter. Also, only the immediate caller
of Filter should care about rejection errors. In which case, Filter will be
referenced explicitly.

Refs: #131
This commit is contained in:
Carl Lerche 2019-03-01 15:25:21 -08:00 committed by GitHub
parent bf8c3b885a
commit fb01af2ad9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 220 additions and 326 deletions

54
tower-filter/src/error.rs Normal file
View File

@ -0,0 +1,54 @@
//! Error types
use std::error;
use std::fmt;
/// Error produced by `Filter`
#[derive(Debug)]
pub struct Error {
source: Option<Source>,
}
pub(crate) type Source = Box<dyn error::Error + Send + Sync>;
impl Error {
/// Create a new `Error` representing a rejected request.
pub fn rejected() -> Error {
Error { source: None }
}
/// Create a new `Error` representing an inner service error.
pub fn inner<E>(source: E) -> Error
where
E: Into<Source>,
{
Error {
source: Some(source.into()),
}
}
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
if self.source.is_some() {
write!(fmt, "inner service errored")
} else {
write!(fmt, "rejected")
}
}
}
impl error::Error for Error {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
if let Some(ref err) = self.source {
Some(&**err)
} else {
None
}
}
}
pub(crate) mod never {
#[derive(Debug)]
pub enum Never {}
}

View File

@ -0,0 +1,86 @@
//! Future types
use error::{self, Error};
use futures::{Async, Future, Poll};
use tower_service::Service;
/// Filtered response future
#[derive(Debug)]
pub struct ResponseFuture<T, S, Request>
where
S: Service<Request>,
{
/// Response future state
state: State<Request, S::Future>,
/// Predicate future
check: T,
/// Inner service
service: S,
}
#[derive(Debug)]
enum State<Request, U> {
Check(Request),
WaitResponse(U),
Invalid,
}
impl<T, S, Request> ResponseFuture<T, S, Request>
where
T: Future<Error = Error>,
S: Service<Request>,
S::Error: Into<error::Source>,
{
pub(crate) fn new(request: Request, check: T, service: S) -> Self {
ResponseFuture {
state: State::Check(request),
check,
service,
}
}
}
impl<T, S, Request> Future for ResponseFuture<T, S, Request>
where
T: Future<Error = Error>,
S: Service<Request>,
S::Error: Into<error::Source>,
{
type Item = S::Response;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
use self::State::*;
use std::mem;
loop {
match mem::replace(&mut self.state, Invalid) {
Check(request) => {
// Poll predicate
match self.check.poll()? {
Async::Ready(_) => {
let response = self.service.call(request);
self.state = WaitResponse(response);
}
Async::NotReady => {
self.state = Check(request);
return Ok(Async::NotReady);
}
}
}
WaitResponse(mut response) => {
let ret = response.poll().map_err(Error::inner);
self.state = WaitResponse(response);
return ret;
}
Invalid => {
panic!("invalid state");
}
}
}
}
}

31
tower-filter/src/layer.rs Normal file
View File

@ -0,0 +1,31 @@
use error::{self, Error};
use tower_layer::Layer;
use tower_service::Service;
use {Filter, Predicate};
pub struct FilterLayer<U> {
predicate: U,
}
impl<U> FilterLayer<U> {
pub fn new(predicate: U) -> Self {
FilterLayer { predicate }
}
}
impl<U, S, Request> Layer<S, Request> for FilterLayer<U>
where
U: Predicate<Request> + Clone,
S: Service<Request> + Clone,
S::Error: Into<error::Source>,
{
type Response = S::Response;
type Error = Error;
type LayerError = error::never::Never;
type Service = Filter<S, U>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
let predicate = self.predicate.clone();
Ok(Filter::new(service, predicate))
}
}

View File

@ -5,288 +5,59 @@ extern crate futures;
extern crate tower_layer;
extern crate tower_service;
use futures::task::AtomicTask;
use futures::{Async, Future, IntoFuture, Poll};
use tower_layer::Layer;
use tower_service::Service;
pub mod error;
pub mod future;
mod layer;
mod predicate;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::{fmt, mem};
pub use layer::FilterLayer;
pub use predicate::Predicate;
use error::Error;
use future::ResponseFuture;
use futures::Poll;
use tower_service::Service;
#[derive(Debug)]
pub struct Filter<T, U> {
inner: T,
predicate: U,
// Tracks the number of in-flight requests
counts: Arc<Counts>,
}
pub struct FilterLayer<U> {
predicate: U,
buffer: usize,
}
pub struct ResponseFuture<T, S, Request>
where
S: Service<Request>,
{
inner: ResponseInner<T, S, Request>,
}
#[derive(Debug)]
struct ResponseInner<T, S, Request>
where
S: Service<Request>,
{
state: State<Request, S::Future>,
check: T,
service: S,
counts: Arc<Counts>,
}
/// Errors produced by `Filter`
#[derive(Debug)]
pub enum Error<T, U> {
/// The predicate rejected the request.
Rejected(T),
/// The inner service produced an error.
Inner(U),
}
/// Checks a request
pub trait Predicate<Request> {
type Error;
type Future: Future<Item = (), Error = Self::Error>;
fn check(&mut self, request: &Request) -> Self::Future;
}
#[derive(Debug)]
struct Counts {
/// Filter::poll_ready task
task: AtomicTask,
/// Remaining capacity
rem: AtomicUsize,
}
#[derive(Debug)]
enum State<Request, U> {
Check(Request),
WaitReady(Request),
WaitResponse(U),
Invalid,
}
// ===== impl Filter =====
impl<U> FilterLayer<U> {
pub fn new(predicate: U, buffer: usize) -> Self {
FilterLayer { predicate, buffer }
}
}
impl<U, S, Request> Layer<S, Request> for FilterLayer<U>
where
U: Predicate<Request> + Clone,
S: Service<Request> + Clone,
{
type Response = S::Response;
type Error = Error<U::Error, S::Error>;
type LayerError = ();
type Service = Filter<S, U>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
let predicate = self.predicate.clone();
Ok(Filter::new(service, predicate, self.buffer))
}
}
// ===== impl Filter =====
impl<T, U> Filter<T, U> {
pub fn new<Request>(inner: T, predicate: U, buffer: usize) -> Self
pub fn new<Request>(inner: T, predicate: U) -> Self
where
T: Service<Request> + Clone,
T::Error: Into<error::Source>,
U: Predicate<Request>,
{
let counts = Counts {
task: AtomicTask::new(),
rem: AtomicUsize::new(buffer),
};
Filter {
inner,
predicate,
counts: Arc::new(counts),
}
Filter { inner, predicate }
}
}
impl<T, U, Request> Service<Request> for Filter<T, U>
where
T: Service<Request> + Clone,
T::Error: Into<error::Source>,
U: Predicate<Request>,
{
type Response = T::Response;
type Error = Error<U::Error, T::Error>;
type Error = Error;
type Future = ResponseFuture<U::Future, T, Request>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.counts.task.register();
let rem = self.counts.rem.load(SeqCst);
// TODO: Handle catching upstream closing
if rem == 0 {
Ok(Async::NotReady)
} else {
Ok(().into())
}
self.inner.poll_ready().map_err(error::Error::inner)
}
fn call(&mut self, request: Request) -> Self::Future {
let rem = self.counts.rem.load(SeqCst);
use std::mem;
if rem == 0 {
panic!("service not ready; poll_ready must be called first");
}
// Decrement
self.counts.rem.fetch_sub(1, SeqCst);
let inner = self.inner.clone();
let inner = mem::replace(&mut self.inner, inner);
// Check the request
let check = self.predicate.check(&request);
// Clone the service
let service = self.inner.clone();
// Clone counts
let counts = self.counts.clone();
ResponseFuture {
inner: ResponseInner {
state: State::Check(request),
check,
service,
counts,
},
}
}
}
// ===== impl Predicate =====
impl<F, T, U> Predicate<T> for F
where
F: Fn(&T) -> U,
U: IntoFuture<Item = ()>,
{
type Error = U::Error;
type Future = U::Future;
fn check(&mut self, request: &T) -> Self::Future {
self(request).into_future()
}
}
// ===== impl ResponseFuture =====
impl<T, S, Request> Future for ResponseFuture<T, S, Request>
where
T: Future,
S: Service<Request>,
{
type Item = S::Response;
type Error = Error<T::Error, S::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
impl<T, S, Request> fmt::Debug for ResponseFuture<T, S, Request>
where
T: fmt::Debug,
S: Service<Request> + fmt::Debug,
S::Future: fmt::Debug,
Request: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ResponseFuture")
.field("inner", &self.inner)
.finish()
}
}
// ===== impl ResponseInner =====
impl<T, S, Request> ResponseInner<T, S, Request>
where
T: Future,
S: Service<Request>,
{
fn inc_rem(&self) {
if 0 == self.counts.rem.fetch_add(1, SeqCst) {
self.counts.task.notify();
}
}
fn poll(&mut self) -> Poll<S::Response, Error<T::Error, S::Error>> {
use self::State::*;
loop {
match mem::replace(&mut self.state, Invalid) {
Check(request) => {
// Poll predicate
match self.check.poll() {
Ok(Async::Ready(_)) => {
self.state = WaitReady(request);
}
Ok(Async::NotReady) => {
self.state = Check(request);
return Ok(Async::NotReady);
}
Err(e) => {
return Err(Error::Rejected(e));
}
}
}
WaitReady(request) => {
// Poll service for readiness
match self.service.poll_ready() {
Ok(Async::Ready(_)) => {
self.inc_rem();
let response = self.service.call(request);
self.state = WaitResponse(response);
}
Ok(Async::NotReady) => {
self.state = WaitReady(request);
return Ok(Async::NotReady);
}
Err(e) => {
self.inc_rem();
return Err(Error::Inner(e));
}
}
}
WaitResponse(mut response) => {
let ret = response.poll().map_err(Error::Inner);
self.state = WaitResponse(response);
return ret;
}
Invalid => {
panic!("invalid state");
}
}
}
ResponseFuture::new(request, check, inner)
}
}

View File

@ -0,0 +1,21 @@
use error::Error;
use futures::{Future, IntoFuture};
/// Checks a request
pub trait Predicate<Request> {
type Future: Future<Item = (), Error = Error>;
fn check(&mut self, request: &Request) -> Self::Future;
}
impl<F, T, U> Predicate<T> for F
where
F: Fn(&T) -> U,
U: IntoFuture<Item = (), Error = Error>,
{
type Future = U::Future;
fn check(&mut self, request: &T) -> Self::Future {
self(request).into_future()
}
}

View File

@ -5,16 +5,15 @@ extern crate tower_service;
extern crate tower_util;
use futures::*;
use tower_filter::*;
use tower_filter::error::Error;
use tower_filter::Filter;
use tower_service::*;
use tower_util::ServiceExt;
use std::sync::mpsc;
use std::thread;
#[test]
fn passthrough_sync() {
let (mut service, mut handle) = new_service(10, |_| Ok::<_, ()>(()));
let (mut service, mut handle) = new_service(|_| Ok(()));
let th = thread::spawn(move || {
// Receive the requests and respond
@ -48,89 +47,21 @@ fn passthrough_sync() {
#[test]
fn rejected_sync() {
let (mut service, _handle) = new_service(10, |_| Err::<(), _>(()));
let (mut service, _handle) = new_service(|_| Err(Error::rejected()));
let response = service.call("hello".into()).wait();
assert!(response.is_err());
}
#[test]
fn saturate() {
use futures::stream::FuturesUnordered;
let (mut service, mut handle) = new_service(1, |_| Ok::<_, ()>(()));
with_task(|| {
// First request is ready
assert!(service.poll_ready().unwrap().is_ready());
});
let mut r1 = service.call("one".into());
with_task(|| {
// Second request is not ready
assert!(service.poll_ready().unwrap().is_not_ready());
});
let mut futs = FuturesUnordered::new();
futs.push(service.ready());
let (tx, rx) = mpsc::channel();
// Complete the request in another thread
let th1 = thread::spawn(move || {
with_task(|| {
assert!(r1.poll().unwrap().is_not_ready());
tx.send(()).unwrap();
let response = r1.wait().unwrap();
assert_eq!(response.as_str(), "resp-one");
});
});
rx.recv().unwrap();
// The service should be ready
let mut service = with_task(|| match futs.poll().unwrap() {
Async::Ready(Some(s)) => s,
Async::Ready(None) => panic!("None"),
Async::NotReady => panic!("NotReady"),
});
let r2 = service.call("two".into());
let th2 = thread::spawn(move || {
let response = r2.wait().unwrap();
assert_eq!(response.as_str(), "resp-two");
});
let request = handle.next_request().unwrap();
assert_eq!("one", request.as_str());
request.respond("resp-one".into());
let request = handle.next_request().unwrap();
assert_eq!("two", request.as_str());
request.respond("resp-two".into());
th1.join().unwrap();
th2.join().unwrap();
}
type Mock = tower_mock::Mock<String, String>;
type Handle = tower_mock::Handle<String, String>;
fn new_service<F, U>(max: usize, f: F) -> (Filter<Mock, F>, Handle)
fn new_service<F, U>(f: F) -> (Filter<Mock, F>, Handle)
where
F: Fn(&String) -> U,
U: IntoFuture<Item = ()>,
U: IntoFuture<Item = (), Error = Error>,
{
let (service, handle) = Mock::new();
let service = Filter::new(service, f, max);
let service = Filter::new(service, f);
(service, handle)
}
fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
use futures::future::{lazy, Future};
lazy(|| Ok::<_, ()>(f())).wait().unwrap()
}